tgravescs commented on a change in pull request #28287:
URL: https://github.com/apache/spark/pull/28287#discussion_r451820344
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -652,18 +652,26 @@ private[spark] class TaskSchedulerImpl(
case Some ((executorId, _)) =>
Review comment:
I think the above comment needs to be updated
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager(
private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
val pending = listener.totalPendingTasksPerResourceProfile(rpId)
val pendingSpeculative =
listener.pendingSpeculativeTasksPerResourceProfile(rpId)
+ val numUnschedulables =
listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
Review comment:
we should clarify that this is unschedulable task sets, not
unschedulable number of tasks.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -311,6 +311,16 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}
+ /**
+ * Called by the TaskSetManager when there is an unschedulable blacklist
task and dynamic
+ * allocation is enabled
+ */
+ def unschedulableBlacklistTaskSubmitted(
Review comment:
I think we should rename. Maybe just remove Submitted. since this
isn't really Task Submission
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +810,23 @@ private[spark] class ExecutorAllocationManager(
}
}
+ override def onUnschedulableTaskSet
+ (unschedulableTaskSet: SparkListenerUnschedulableTaskSet): Unit = {
Review comment:
nit, fix indentation and style. "(" should be on previous line,
parameters indented 4 spaces
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -829,12 +867,28 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}
+ // Currently TaskSetManager.getCompletelyBlacklistedTaskIfAny only takes
the first
+ // unschedulable task due to blacklisting. So keeping track of
unschedulableTaskSets
+ // should be enough as we'll always have no more than a task unschedulable
at any time.
Review comment:
I think this comment is confusing, specifically "as we'll always have no
more than a task unschedulable at any time".
That isn't true. you could have lots of tasks blacklisted and thus
unschedulable at any time. That function only returns the first one it finds.
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -789,6 +811,23 @@ private[spark] class ExecutorAllocationManager(
}
}
+ override def onUnschedulableBlacklistTaskSubmitted
+ (blacklistedTask: SparkListenerUnschedulableBlacklistTaskSubmitted):
Unit = {
+ val stageId = blacklistedTask.stageId
+ val stageAttemptId = blacklistedTask.stageAttemptId
+ allocationManager.synchronized {
+ (stageId, stageAttemptId) match {
+ case (Some(stageId), Some(stageAttemptId)) =>
+ val stageAttempt = StageAttempt(stageId, stageAttemptId)
+ unschedulableTaskSets.add(stageAttempt)
+ case (None, _) =>
+ // Clear unschedulableTaskSets since atleast one task becomes
schedulable now
+ unschedulableTaskSets.clear()
Review comment:
Yeah I could go either way on this as well. if we only remove the task
set it could help get more faster if lots of blacklisted or you only requested
a few to begin with. I think its a big out of the norm for us to rely on the
parameters here being None as well, traditionally we would have 2 messages, 1
for it being added, one for it being removed. perhaps
onUnschedulableTaskSetAdded and onUnschedulableTaskSetRemoved.
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -289,13 +290,26 @@ private[spark] class ExecutorAllocationManager(
s" tasksperexecutor: $tasksPerExecutor")
val maxNeeded = math.ceil(numRunningOrPendingTasks *
executorAllocationRatio /
tasksPerExecutor).toInt
- if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
+ val maxNeededWithSpeculationLocalityOffset =
+ if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
// If we have pending speculative tasks and only need a single executor,
allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}
+
+ // Since the maxNeededWithSpeculationLocalityOffset already includes the
num executors needed
+ // to run unschedulable tasks, we would only try to add when the
+ // maxNeededWithSpeculationLocalityOffset is lesser than the active
executors
Review comment:
this comment is also confusing to me. its also above a check for
numUnschedulables, which has nothing to with the comment. Can we move it down
next to the check on line 308 and clarify. I think you mean we don't add any
tasks for the unschedulables if active executors is less than the number we are
already asking for.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]