squito commented on a change in pull request #23677: [SPARK-26755][SCHEDULER] : Optimize Spark Scheduler to dequeue speculative tasks… URL: https://github.com/apache/spark/pull/23677#discussion_r298358439
########## File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ########## @@ -158,9 +158,21 @@ private[spark] class TaskSetManager( // Set containing all pending tasks (also used as a stack, as above). private val allPendingTasks = new ArrayBuffer[Int] - // Tasks that can be speculated. Since these will be a small fraction of total - // tasks, we'll just hold them in a HashSet. - private[scheduler] val speculatableTasks = new HashSet[Int] + // Set of pending tasks that can be speculated for each executor. + private[scheduler] var pendingSpeculatableTasksForExecutor = + new HashMap[String, HashSet[Int]] + + // Set of pending tasks that can be speculated for each host. + private[scheduler] var pendingSpeculatableTasksForHost = new HashMap[String, HashSet[Int]] + + // Set of pending tasks that can be speculated with no locality preferences. + private[scheduler] val pendingSpeculatableTasksWithNoPrefs = new HashSet[Int] + + // Set of pending tasks that can be speculated for each rack. + private[scheduler] var pendingSpeculatableTasksForRack = new HashMap[String, HashSet[Int]] + + // Set of all pending tasks that can be speculated. + private[scheduler] val allPendingSpeculatableTasks = new HashSet[Int] Review comment: overall, this makes a lot of sense. I see how you're replacing a bunch of `O(numSpeculatableTasks)` operations with things specific to tasks targeted at each executor, etc., just like the other scheduler methods. I don't really like how there is so much code duplication between the speculative and non-speculative versions, for what seem like minor differences. I was thinking you could create a private case class like ```scala case class PendingsTasksByLocality(forExecutor: HashMap[String, ArrayBuffer[Int]], forHost: HashMap[String, ArrayBuffer[Int]], ...) ``` with one instance for all pending tasks, one for pending speculative tasks. Then a lot of the functions could be combined (eg. dequeueSpeculativeTaskFromList & dequeueTaskFromList) just with a `speculative: Boolean` parameter added. Your new datastructures are `HashMap[String, HashSet]` instead of `HashMap[String, ArrayBuffer]`, but I think you can go back to `ArrayBuffer` if you *also* keep `val speculatableTasks = new HashSet[Int]` for avoiding adding duplicate entries. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
