[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-09 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044596644


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -674,22 +674,30 @@ private[spark] class AppStatusListener(
   delta
 }.orNull
 
-val (completedDelta, failedDelta, killedDelta) = event.reason match {
+// SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, 
which is raised by
+// executor lost, it can lead to negative `LiveStage.activeTasks` since 
there's no
+// corresponding `SparkListenerTaskStart` event for each of them. The 
negative activeTasks
+// will make the stage always remains in the live stage list as it can 
never meet the
+// condition activeTasks == 0. This in turn causes the dead executor to 
never be retained
+// if that live stage's submissionTime is less than the dead executor's 
removeTime.
+val (completedDelta, failedDelta, killedDelta, activeTasksDelta) = 
event.reason match {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-06 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1041671057


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   OK , so let's focus on fix the active tasks first and left the failed 
counters go up, I have change the pr,  could you help check it @mridulm 
@Ngone51 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-04 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1039103663


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   @cloud-fan @Ngone51 @mridulm any advice for the metrics and counters? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-30 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1035584393


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   > Couple of things to clarify (the pr description/comments confused me a 
bit).
   > 
   > a) There was a pair of task start and task end event which were fired for 
the task (let us call it Tr) b) When executor which ran Tr was lost, while 
stage is still running, a `Resubmitted` is fired for Tr. c) Subsequently, a new 
task start and task end will be fired for the retry of Tr.
   > 
   > The issue here is, (b) is associated with (a) - which was a success 
earlier, but has now been marked as a failure.
   > 
   > We should not be ignoring this event, but rather deal with it properly. 
For example, update counters, number of failed tasks, etc.
   
   The steps is right, and in my opinion the task end event in (b) with the 
`Resubmitted` reason can be ignored in AppStatusListener, ignore it also make 
the logic clear. 
   
In (a), the counter and metric is already recorded for Tr. eg. the 
stage.completedTasks, stage.job.completedTasks, stage.executorSummary.taskTime 
and so on.
In (b), we got an extra task end with the "Resubmitted" reason for Tr.  
thus Tr has one start event with two end event. The activeTask counter in stage 
will be wrong, which is the root cause for this jira.
In (c), the counter and metric will be recorded for the retry of Tr.
   
For the metrics, if we handled the task end message in (b), it will be 
redundant, and cause wrong value, for example stage.executorSummary.taskTime 
will be added again, will need to excluded them in `Resubmitted` situation.
   
And for the counters, the one need to be discussed is the counter 
`failedTasks`.  I think it both make sense for the following two situations: 
   1. ( completedTasks, failedTasks, killedTasks ) = (2, 0, 0) , which means Tr 
success twice with no failure, actually resubmitted is a specific signal, it 
just rerun a success task, no task is actually failed
   and 
   2. ( completedTasks, failedTasks, killedTasks ) = (2, 1, 0) , which means Tr 
success twice and we think resubmitted is a failure.  But it also will be 
confused with the tasks which really failed, like the tasks which is running 
when executor lost, these one is the really failed ones . 
   
   Am I making sense? 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   > to help understand the code more, do you know when/where the scheduler 
handles resubmitted tasks?
   1. First in TaskSetManager.scala, we handle the executorLost
   ``` scala
 override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
   // Re-enqueue any tasks that ran on the failed executor if this is a 
shuffle map stage,
   // and we are not using an external shuffle server which could serve the 
shuffle outputs.
   // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
   // so we would need to rerun these tasks on other executors.
   if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
 for ((tid, info) <- taskInfos if info.executorId == execId) {
   val index = info.index
   // We may have a running task whose partition has been marked as 
successful,
   // this partition has another task completed in another stage 
attempt.
   // We treat it as a running task and will call handleFailedTask 
later.
   if (successful(index) && !info.running && 
!killedByOtherAttempt.contains(tid)) {
 successful(index) = false
 copiesRunning(index) -= 1
 tasksSuccessful -= 1
 addPendingTask(index)
 // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
 // stage finishes when a total of tasks.size tasks finish.
 sched.dagScheduler.taskEnded(
   tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
   }
 }
   }
   ```
   2. the successful task is added to queue to re-execute in `addPendingTask`
   3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle 
this, this will post a `CompletionEvent` to eventProcessLoop  
   4. the function `handleTaskCompletion` in DATScheduler.scala will handle 
this message, as the reason `Resubmitted` is kind of a failure, it will call 
the function `handleResubmittedFailure`,  which add the task partitionId to 
ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted 
task to finish again.
   ``` scala
 private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = {
   logInfo(s"Resubmitted $task, so marking it as still running.")
   stage match {
 case sms: ShuffleMapStage =>
   sms.pendingPartitions += task.partitionId
   
 case _ =>
   throw 
SparkCoreErrors.sendResubmittedTaskStatusForShuffleMapStagesOnlyError()
   }
 }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034824042


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted

Review Comment:
   > Could you copy-paste your analysis in JIRA to the PR description to 
elaborate the issue more clearly?
   
   done, I used the description in your suggestion, I think it's more clear. 
And by the way, I commit the suggestion one by one which cause too many commit, 
could I rebase them into one and force push it ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034798225


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors

Review Comment:
   > 
   
   You make it more clearly, great



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   1. First in TaskSetManager.scala, we handle the executorLost
   ``` scala
 override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
   // Re-enqueue any tasks that ran on the failed executor if this is a 
shuffle map stage,
   // and we are not using an external shuffle server which could serve the 
shuffle outputs.
   // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
   // so we would need to rerun these tasks on other executors.
   if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
 for ((tid, info) <- taskInfos if info.executorId == execId) {
   val index = info.index
   // We may have a running task whose partition has been marked as 
successful,
   // this partition has another task completed in another stage 
attempt.
   // We treat it as a running task and will call handleFailedTask 
later.
   if (successful(index) && !info.running && 
!killedByOtherAttempt.contains(tid)) {
 successful(index) = false
 copiesRunning(index) -= 1
 tasksSuccessful -= 1
 addPendingTask(index)
 // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
 // stage finishes when a total of tasks.size tasks finish.
 sched.dagScheduler.taskEnded(
   tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
   }
 }
   }
   ```
   2. the successful task is added to queue to re-execute in `addPendingTask`
   3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle 
this, this will post a `CompletionEvent` to eventProcessLoop  
   4. the function `handleTaskCompletion` in DATScheduler.scala will handle 
this message, as the reason `Resubmitted` is kind of a failure, it will call 
the function `handleResubmittedFailure`,  which add the task partitionId to 
ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted 
task to finish again.
   ``` scala
 private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = {
   logInfo(s"Resubmitted $task, so marking it as still running.")
   stage match {
 case sms: ShuffleMapStage =>
   sms.pendingPartitions += task.partitionId
   
 case _ =>
   throw 
SparkCoreErrors.sendResubmittedTaskStatusForShuffleMapStagesOnlyError()
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   1. First in TaskSetManager.scala, we handle the executorLost
   ``` scala
 override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason): Unit = {
   // Re-enqueue any tasks that ran on the failed executor if this is a 
shuffle map stage,
   // and we are not using an external shuffle server which could serve the 
shuffle outputs.
   // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
   // so we would need to rerun these tasks on other executors.
   if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
 for ((tid, info) <- taskInfos if info.executorId == execId) {
   val index = info.index
   // We may have a running task whose partition has been marked as 
successful,
   // this partition has another task completed in another stage 
attempt.
   // We treat it as a running task and will call handleFailedTask 
later.
   if (successful(index) && !info.running && 
!killedByOtherAttempt.contains(tid)) {
 successful(index) = false
 copiesRunning(index) -= 1
 tasksSuccessful -= 1
 addPendingTask(index)
 // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
 // stage finishes when a total of tasks.size tasks finish.
 sched.dagScheduler.taskEnded(
   tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
   }
 }
   }
   ```
   2. the successful task is added to queue to re-execute in `addPendingTask`
   3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle 
this, this will post a `CompletionEvent` to eventProcessLoop  
   4. the function `handleTaskCompletion` in DATScheduler.scala will handle 
this message, as the reason `Resubmitted` is kind of a failure, it will call 
the function `handleResubmittedFailure`,  which add the task partitionId to 
ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted 
task to finish again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034713383


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted

Review Comment:
   OK~ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-28 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted

Review Comment:
The reason the resubmitted event sent is added in the comment  in  
TaskSetManager.scala
   // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
   // stage finishes when a total of tasks.size tasks finish.
   The successful task will be rerun, and the DAGScheduler need this event to 
know the map stage is not finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-28 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted

Review Comment:
The reason the resubmitted event added is added in the comment   
   // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
   // stage finishes when a total of tasks.size tasks finish.
   The successful task will be rerun, and the DAGScheduler need this event to 
know the map stage is not finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-28 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted

Review Comment:
The reason the "resubmitted" task added is added in the comment   
   // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
   // stage finishes when a total of tasks.size tasks finish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-28 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted

Review Comment:
   // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
   // stage finishes when a total of tasks.size tasks finish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-28 Thread GitBox


wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309587


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted

Review Comment:
   Please check the  JIRA https://issues.apache.org/jira/browse/SPARK-41187。 
For a shuffle map stage tasks, if a executor lost happen,  the finished task 
will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" 
in TaskSetManager.scala, this will cause the  activeTask in AppStatusListner's 
liveStage become negative



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org