[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1044596644 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -674,22 +674,30 @@ private[spark] class AppStatusListener( delta }.orNull -val (completedDelta, failedDelta, killedDelta) = event.reason match { +// SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, which is raised by +// executor lost, it can lead to negative `LiveStage.activeTasks` since there's no +// corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks +// will make the stage always remains in the live stage list as it can never meet the +// condition activeTasks == 0. This in turn causes the dead executor to never be retained +// if that live stage's submissionTime is less than the dead executor's removeTime. +val (completedDelta, failedDelta, killedDelta, activeTasksDelta) = event.reason match { Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1041671057 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { -// TODO: can this really happen? -if (event.taskInfo == null) { +// TODO: can taskInfo null really happen? +// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and +// will make activeTask in stage to be negative, this will cause stage not be removed in +// liveStages, and finally cause executor not removed in deadExecutors +if (event.taskInfo == null || event.reason == Resubmitted) { Review Comment: OK , so let's focus on fix the active tasks first and left the failed counters go up, I have change the pr, could you help check it @mridulm @Ngone51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1039103663 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { -// TODO: can this really happen? -if (event.taskInfo == null) { +// TODO: can taskInfo null really happen? +// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and +// will make activeTask in stage to be negative, this will cause stage not be removed in +// liveStages, and finally cause executor not removed in deadExecutors +if (event.taskInfo == null || event.reason == Resubmitted) { Review Comment: @cloud-fan @Ngone51 @mridulm any advice for the metrics and counters? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1035584393 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { -// TODO: can this really happen? -if (event.taskInfo == null) { +// TODO: can taskInfo null really happen? +// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and +// will make activeTask in stage to be negative, this will cause stage not be removed in +// liveStages, and finally cause executor not removed in deadExecutors +if (event.taskInfo == null || event.reason == Resubmitted) { Review Comment: > Couple of things to clarify (the pr description/comments confused me a bit). > > a) There was a pair of task start and task end event which were fired for the task (let us call it Tr) b) When executor which ran Tr was lost, while stage is still running, a `Resubmitted` is fired for Tr. c) Subsequently, a new task start and task end will be fired for the retry of Tr. > > The issue here is, (b) is associated with (a) - which was a success earlier, but has now been marked as a failure. > > We should not be ignoring this event, but rather deal with it properly. For example, update counters, number of failed tasks, etc. The steps is right, and in my opinion the task end event in (b) with the `Resubmitted` reason can be ignored in AppStatusListener, ignore it also make the logic clear. In (a), the counter and metric is already recorded for Tr. eg. the stage.completedTasks, stage.job.completedTasks, stage.executorSummary.taskTime and so on. In (b), we got an extra task end with the "Resubmitted" reason for Tr. thus Tr has one start event with two end event. The activeTask counter in stage will be wrong, which is the root cause for this jira. In (c), the counter and metric will be recorded for the retry of Tr. For the metrics, if we handled the task end message in (b), it will be redundant, and cause wrong value, for example stage.executorSummary.taskTime will be added again, will need to excluded them in `Resubmitted` situation. And for the counters, the one need to be discussed is the counter `failedTasks`. I think it both make sense for the following two situations: 1. ( completedTasks, failedTasks, killedTasks ) = (2, 0, 0) , which means Tr success twice with no failure, actually resubmitted is a specific signal, it just rerun a success task, no task is actually failed and 2. ( completedTasks, failedTasks, killedTasks ) = (2, 1, 0) , which means Tr success twice and we think resubmitted is a failure. But it also will be confused with the tasks which really failed, like the tasks which is running when executor lost, these one is the really failed ones . Am I making sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { -// TODO: can this really happen? -if (event.taskInfo == null) { +// TODO: can taskInfo null really happen? +// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and +// will make activeTask in stage to be negative, this will cause stage not be removed in +// liveStages, and finally cause executor not removed in deadExecutors +if (event.taskInfo == null || event.reason == Resubmitted) { Review Comment: > to help understand the code more, do you know when/where the scheduler handles resubmitted tasks? 1. First in TaskSetManager.scala, we handle the executorLost ``` scala override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = { // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage, // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = info.index // We may have a running task whose partition has been marked as successful, // this partition has another task completed in another stage attempt. // We treat it as a running task and will call handleFailedTask later. if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) { successful(index) = false copiesRunning(index) -= 1 tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. sched.dagScheduler.taskEnded( tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) } } } ``` 2. the successful task is added to queue to re-execute in `addPendingTask` 3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle this, this will post a `CompletionEvent` to eventProcessLoop 4. the function `handleTaskCompletion` in DATScheduler.scala will handle this message, as the reason `Resubmitted` is kind of a failure, it will call the function `handleResubmittedFailure`, which add the task partitionId to ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted task to finish again. ``` scala private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { logInfo(s"Resubmitted $task, so marking it as still running.") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId case _ => throw SparkCoreErrors.sendResubmittedTaskStatusForShuffleMapStagesOnlyError() } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034824042 ## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ## @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") { + +val listener = new AppStatusListener(store, conf, true) + +listener.onExecutorAdded(createExecutorAddedEvent(1)) +listener.onExecutorAdded(createExecutorAddedEvent(2)) +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + +time += 1 +stage.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(2, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FINISHED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + +// executor lost, success task will be resubmitted Review Comment: > Could you copy-paste your analysis in JIRA to the PR description to elaborate the issue more clearly? done, I used the description in your suggestion, I think it's more clear. And by the way, I commit the suggestion one by one which cause too many commit, could I rebase them into one and force push it ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034798225 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { -// TODO: can this really happen? -if (event.taskInfo == null) { +// TODO: can taskInfo null really happen? +// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and +// will make activeTask in stage to be negative, this will cause stage not be removed in +// liveStages, and finally cause executor not removed in deadExecutors Review Comment: > You make it more clearly, great -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { -// TODO: can this really happen? -if (event.taskInfo == null) { +// TODO: can taskInfo null really happen? +// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and +// will make activeTask in stage to be negative, this will cause stage not be removed in +// liveStages, and finally cause executor not removed in deadExecutors +if (event.taskInfo == null || event.reason == Resubmitted) { Review Comment: 1. First in TaskSetManager.scala, we handle the executorLost ``` scala override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = { // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage, // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = info.index // We may have a running task whose partition has been marked as successful, // this partition has another task completed in another stage attempt. // We treat it as a running task and will call handleFailedTask later. if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) { successful(index) = false copiesRunning(index) -= 1 tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. sched.dagScheduler.taskEnded( tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) } } } ``` 2. the successful task is added to queue to re-execute in `addPendingTask` 3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle this, this will post a `CompletionEvent` to eventProcessLoop 4. the function `handleTaskCompletion` in DATScheduler.scala will handle this message, as the reason `Resubmitted` is kind of a failure, it will call the function `handleResubmittedFailure`, which add the task partitionId to ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted task to finish again. ``` scala private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = { logInfo(s"Resubmitted $task, so marking it as still running.") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId case _ => throw SparkCoreErrors.sendResubmittedTaskStatusForShuffleMapStagesOnlyError() } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151 ## core/src/main/scala/org/apache/spark/status/AppStatusListener.scala: ## @@ -645,8 +645,11 @@ private[spark] class AppStatusListener( } override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { -// TODO: can this really happen? -if (event.taskInfo == null) { +// TODO: can taskInfo null really happen? +// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and +// will make activeTask in stage to be negative, this will cause stage not be removed in +// liveStages, and finally cause executor not removed in deadExecutors +if (event.taskInfo == null || event.reason == Resubmitted) { Review Comment: 1. First in TaskSetManager.scala, we handle the executorLost ``` scala override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = { // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage, // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = info.index // We may have a running task whose partition has been marked as successful, // this partition has another task completed in another stage attempt. // We treat it as a running task and will call handleFailedTask later. if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) { successful(index) = false copiesRunning(index) -= 1 tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. sched.dagScheduler.taskEnded( tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) } } } ``` 2. the successful task is added to queue to re-execute in `addPendingTask` 3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle this, this will post a `CompletionEvent` to eventProcessLoop 4. the function `handleTaskCompletion` in DATScheduler.scala will handle this message, as the reason `Resubmitted` is kind of a failure, it will call the function `handleResubmittedFailure`, which add the task partitionId to ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted task to finish again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034713383 ## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ## @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") { + +val listener = new AppStatusListener(store, conf, true) + +listener.onExecutorAdded(createExecutorAddedEvent(1)) +listener.onExecutorAdded(createExecutorAddedEvent(2)) +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + +time += 1 +stage.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(2, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FINISHED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + +// executor lost, success task will be resubmitted Review Comment: OK~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933 ## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ## @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") { + +val listener = new AppStatusListener(store, conf, true) + +listener.onExecutorAdded(createExecutorAddedEvent(1)) +listener.onExecutorAdded(createExecutorAddedEvent(2)) +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + +time += 1 +stage.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(2, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FINISHED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + +// executor lost, success task will be resubmitted Review Comment: The reason the resubmitted event sent is added in the comment in TaskSetManager.scala // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. The successful task will be rerun, and the DAGScheduler need this event to know the map stage is not finished. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933 ## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ## @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") { + +val listener = new AppStatusListener(store, conf, true) + +listener.onExecutorAdded(createExecutorAddedEvent(1)) +listener.onExecutorAdded(createExecutorAddedEvent(2)) +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + +time += 1 +stage.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(2, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FINISHED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + +// executor lost, success task will be resubmitted Review Comment: The reason the resubmitted event added is added in the comment // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. The successful task will be rerun, and the DAGScheduler need this event to know the map stage is not finished. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933 ## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ## @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") { + +val listener = new AppStatusListener(store, conf, true) + +listener.onExecutorAdded(createExecutorAddedEvent(1)) +listener.onExecutorAdded(createExecutorAddedEvent(2)) +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + +time += 1 +stage.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(2, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FINISHED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + +// executor lost, success task will be resubmitted Review Comment: The reason the "resubmitted" task added is added in the comment // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933 ## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ## @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") { + +val listener = new AppStatusListener(store, conf, true) + +listener.onExecutorAdded(createExecutorAddedEvent(1)) +listener.onExecutorAdded(createExecutorAddedEvent(2)) +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + +time += 1 +stage.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(2, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FINISHED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + +// executor lost, success task will be resubmitted Review Comment: // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on code in PR #38702: URL: https://github.com/apache/spark/pull/38702#discussion_r1034309587 ## core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala: ## @@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter checkInfoPopulated(listener, logUrlMap, processId) } + test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") { + +val listener = new AppStatusListener(store, conf, true) + +listener.onExecutorAdded(createExecutorAddedEvent(1)) +listener.onExecutorAdded(createExecutorAddedEvent(2)) +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details", + resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + +time += 1 +stage.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(2, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FINISHED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), new ExecutorMetrics, null)) + +// executor lost, success task will be resubmitted Review Comment: Please check the JIRA https://issues.apache.org/jira/browse/SPARK-41187。 For a shuffle map stage tasks, if a executor lost happen, the finished task will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" in TaskSetManager.scala, this will cause the activeTask in AppStatusListner's liveStage become negative -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org