squito commented on a change in pull request #23677: [SPARK-26755][SCHEDULER] : 
Optimize Spark Scheduler to dequeue speculative tasks…
URL: https://github.com/apache/spark/pull/23677#discussion_r298358439
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
 ##########
 @@ -158,9 +158,21 @@ private[spark] class TaskSetManager(
   // Set containing all pending tasks (also used as a stack, as above).
   private val allPendingTasks = new ArrayBuffer[Int]
 
-  // Tasks that can be speculated. Since these will be a small fraction of 
total
-  // tasks, we'll just hold them in a HashSet.
-  private[scheduler] val speculatableTasks = new HashSet[Int]
+  // Set of pending tasks that can be speculated for each executor.
+  private[scheduler] var pendingSpeculatableTasksForExecutor =
+    new HashMap[String, HashSet[Int]]
+
+  // Set of pending tasks that can be speculated for each host.
+  private[scheduler] var pendingSpeculatableTasksForHost = new HashMap[String, 
HashSet[Int]]
+
+  // Set of pending tasks that can be speculated with no locality preferences.
+  private[scheduler] val pendingSpeculatableTasksWithNoPrefs = new HashSet[Int]
+
+  // Set of pending tasks that can be speculated for each rack.
+  private[scheduler] var pendingSpeculatableTasksForRack = new HashMap[String, 
HashSet[Int]]
+
+  // Set of all pending tasks that can be speculated.
+  private[scheduler] val allPendingSpeculatableTasks = new HashSet[Int]
 
 Review comment:
   overall, this makes a lot of sense.  I see how you're replacing a bunch of 
`O(numSpeculatableTasks)` operations with things specific to tasks targeted at 
each executor, etc., just like the other scheduler methods.
   
   I don't really like how there is so much code duplication between the 
speculative and non-speculative versions, for what seem like minor differences. 
 I was thinking you could create a private case class like
   
   ```scala
   case class PendingsTasksByLocality(forExecutor: HashMap[String, 
ArrayBuffer[Int]], forHost: HashMap[String, ArrayBuffer[Int]], ...)
   ```
   
   with one instance for all pending tasks, one for pending speculative tasks.  
Then a lot of the functions could be combined (eg. 
dequeueSpeculativeTaskFromList & dequeueTaskFromList) just with a `speculative: 
Boolean` parameter added.  Your new datastructures are `HashMap[String, 
HashSet]` instead of `HashMap[String, ArrayBuffer]`, but I think you can go 
back to `ArrayBuffer` if you *also* keep `val speculatableTasks = new 
HashSet[Int]` for avoiding adding duplicate entries.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to