tgravescs commented on a change in pull request #32526:
URL: https://github.com/apache/spark/pull/32526#discussion_r638018207



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -859,6 +847,29 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         // Clear unschedulableTaskSets since atleast one task becomes 
schedulable now
         unschedulableTaskSets.remove(stageAttempt)
+        removeStageFromResourceProfileIfUnused(stageAttempt)
+      }
+    }
+
+    def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): 
Unit = {
+      if (!stageAttemptToNumRunningTask.contains(stageAttempt) &&
+          !stageAttemptToNumTasks.contains(stageAttempt) &&
+          !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) &&
+          !stageAttemptToTaskIndices.contains(stageAttempt) &&
+          !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt) &&
+          !unschedulableTaskSets.contains(stageAttempt)

Review comment:
       we don't need to check for unschedulableTaskSets. if a attempt gets into 
there it should have stageAttemptToNumTasks . If its been stage completed then 
stageAttemptToNumTasks would be removed but we shouldn't still be trying to 
schedule it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to