squito commented on a change in pull request #23677: [SPARK-26755][SCHEDULER] :
Optimize Spark Scheduler to dequeue speculative tasks…
URL: https://github.com/apache/spark/pull/23677#discussion_r299586081
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -398,23 +382,25 @@ private[spark] class TaskSetManager(
{
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark
Streaming
- for (index <- dequeueSpeculativeTaskFromList(
- execId, host, pendingSpeculatableTasksForExecutor.getOrElse(execId,
HashSet()))) {
+ for (index <- dequeueTaskFromList(
+ execId, host, pendingSpeculatableTasks.forExecutor.getOrElse(execId,
ArrayBuffer()),
+ speculative = true)) {
Review comment:
once more, I'd just get rid of `dequeueSpeculativeTask` completely
```scala
private def dequeueTask(execId: String, host: String, maxLocality:
TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] = {
dequeueTaskHelper(execId, host, maxLocality, false)
// if we didn't schedule a regular task, try to schedule a speculative
one
.orElse(dequeueTaskHelper(execId, host, maxLocality, true))
}
private def dequeueTaskHelper(
execId: String,
host: String,
maxLocality: TaskLocality.Value,
speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = {
if (speculative && speculatableTasks.isEmpty) {
return None
}
val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else
pendingTasks
def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
dequeueTaskFromList(execId, host, list, speculative)
}
dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId,
ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
dequeue(pendingTaskSetToUse.forHost.getOrElse(host,
ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.NODE_LOCAL, speculative))
}
}
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
return Some((index, TaskLocality.NO_PREF, speculative))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack,
ArrayBuffer()))
} {
return Some((index, TaskLocality.RACK_LOCAL, speculative))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
logInfo(s"about to try to dequeue from
${pendingTaskSetToUse.anyPrefs}")
dequeue(pendingTaskSetToUse.anyPrefs).foreach { index =>
return Some((index, TaskLocality.ANY, speculative))
}
}
None
}
```
as this suggestion is a little long I put it here:
https://github.com/squito/spark/tree/speculation_cleanup
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]