venkata91 commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r455205081



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +873,28 @@ private[spark] class ExecutorAllocationManager(
       numTotalTasks - numRunning
     }
 
+    // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes 
the first unschedulable
+    // task found due to blacklisting. This way we only need to keep track of 
the unschedulable
+    // tasksets which is an indirect way to get the current number of 
unschedulable tasks.
+    def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
+      val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, 
Set.empty).toSeq
+      attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
+    }
+
+    def hasPendingUnschedulableTasks: Boolean = {
+      val attemptSets = resourceProfileIdToStageAttempt.values
+      attemptSets.exists { attempts =>
+        attempts.exists(unschedulableTaskSets.contains(_))
+      }
+    }
+
     def hasPendingTasks: Boolean = {
-      hasPendingSpeculativeTasks || hasPendingRegularTasks
+      hasPendingSpeculativeTasks || hasPendingRegularTasks || 
hasPendingUnschedulableTasks

Review comment:
       No, this is not needed. I will remove this. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to