guiyanakuang commented on a change in pull request #34743:
URL: https://github.com/apache/spark/pull/34743#discussion_r762409383
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -259,21 +259,29 @@ private[spark] class TaskSetManager(
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new
ArrayBuffer) += index
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new
ArrayBuffer) += index
}
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
logInfo(s"Pending task $index has a cached location at ${e.host}
" +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location
at ${e.host} " +
", but there are no executors alive there.")
}
- case _ =>
+ case _: HostTaskLocation =>
+ val exe = sched.getExecutorsAliveOnHost(loc.host)
+ exe match {
+ case Some(_) =>
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
+ case _ => logDebug(s"Pending task $index has a location at
${loc.host} " +
+ ", but there are no executors alive there.")
+ }
}
- pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer)
+= index
Review comment:
Update:
@DefaultEntry[(kv: driver-host, ArrayBuffer())], indeed Task 0 has finished
at NODE_LOCAL, but Task 1, 2 have been in pending state
@mridulm, thanks for the detailed answer, I currently avoid task pending by
setting spark.locality.wait.node to 0. In fact a permanent pending is possible,
if there are not many resources, spark.locality.wait.node defaults to 3s, and
all remaining resources are tried within this time range, then there is no
chance to get to the next TaskLocality, although `computeValidLocalityLevels`
returns [ PROCESS_LOCAL, NODE_LOCAL, ANY].
While setting spark.locality.wait.node to 0 eased my production
environment's trouble, I think it would be better to treat the TaskLocation
special in this case, as the current code also treats the HDFSCacheTaskLocation
special
https://github.com/apache/spark/blob/030de1d09f121b167aaaa8237a2807f902c1e710/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L262-L272
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]