mridulm commented on a change in pull request #34743:
URL: https://github.com/apache/spark/pull/34743#discussion_r762517901



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -259,21 +259,29 @@ private[spark] class TaskSetManager(
       loc match {
         case e: ExecutorCacheTaskLocation =>
           pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new 
ArrayBuffer) += index
+          pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new 
ArrayBuffer) += index
         case e: HDFSCacheTaskLocation =>
           val exe = sched.getExecutorsAliveOnHost(loc.host)
           exe match {
             case Some(set) =>
               for (e <- set) {
                 pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new 
ArrayBuffer) += index
               }
+              pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new 
ArrayBuffer) += index
               logInfo(s"Pending task $index has a cached location at ${e.host} 
" +
                 ", where there are executors " + set.mkString(","))
             case None => logDebug(s"Pending task $index has a cached location 
at ${e.host} " +
               ", but there are no executors alive there.")
           }
-        case _ =>
+        case _: HostTaskLocation =>
+          val exe = sched.getExecutorsAliveOnHost(loc.host)
+          exe match {
+            case Some(_) =>
+              pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new 
ArrayBuffer) += index
+            case _ => logDebug(s"Pending task $index has a location at 
${loc.host} " +
+              ", but there are no executors alive there.")
+          }
       }
-      pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) 
+= index

Review comment:
       > In fact a permanent pending is possible, if there are not many 
resources, spark.locality.wait.node defaults to 3s, and all remaining resources 
are tried within this time range, then there is no chance to get to the next 
TaskLocality, although computeValidLocalityLevels returns [ PROCESS_LOCAL, 
NODE_LOCAL, ANY].
   
   I am confused about this statement - if there are no resources available, no 
tasks can be scheduled and so spark will  wait indefinitely.
   
   If/when there are resources available, scheduler will move to the next 
locality level once the previous level times out.
   
   > I think it would be better to treat the TaskLocation special in this case, 
as the current code also treats the HDFSCacheTaskLocation special
   
   Current behavior of `HDFSCacheTaskLocation` is:
   a) If there are executors on the host, add to `forExecutor`
   b) Add to `forHost`
   
   (b) is done for all tasks which have locality preference.
   
   As and when a new executor is added or an existing executor is removed, 
`computeValidLocalityLevels` iterates through the various `for<LEVEL>` to 
identify if there are executors available on that LEVEL to determine whether to 
keep the level for schedule.
   
   What is proposed in the PR will break `computeValidLocalityLevels` - since 
addition of a new executor will not add host level schedule.
   
   If the issue we are attempting to solve is:
   "for the current pending tasks, with the executor resources available to the 
application, disregard a locality level if there are no valid candidates for 
that level"
   
   If yes, the current PR is not solving this issue - though this is a good 
problem to address in the scheduler.
   I do agree, setting wait time to 0 just to mitigate this issue is indeed 
suboptimal.
   
   We will have to look at the cost/benefit of doing that based on the proposal 
which fixes this issue - since I suspect the costs would not be very cheap (the 
various materialized datastructures in scheduler are to keep the costs as low 
as possible).
   
   Hope that clarifies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to