guiyanakuang commented on a change in pull request #34743:
URL: https://github.com/apache/spark/pull/34743#discussion_r760721345
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -259,21 +259,29 @@ private[spark] class TaskSetManager(
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new
ArrayBuffer) += index
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new
ArrayBuffer) += index
}
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
logInfo(s"Pending task $index has a cached location at ${e.host}
" +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location
at ${e.host} " +
", but there are no executors alive there.")
}
- case _ =>
+ case _: HostTaskLocation =>
+ val exe = sched.getExecutorsAliveOnHost(loc.host)
+ exe match {
+ case Some(_) =>
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
+ case _ => logDebug(s"Pending task $index has a location at
${loc.host} " +
+ ", but there are no executors alive there.")
+ }
}
- pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer)
+= index
Review comment:
@mridulm Thank you for your review.
I recognize the downside of doing this, causing many possibilities to fail
that could be accelerated with a little waiting.
The trouble for me is that the host doesn't have a woker or NodeManager, so
it can never start an executor on the corresponding host machine. Also the
`getAllowedLocalityLevel` method prevents the task from trying to go farther
locally. The task can never be actually executed.
Based on this, I have two options in mind
1. can we implement a new task locality, e.g. `HDFSDiskCacheTaskLocation`
instead of `HostTaskLocation` here.
Then only the executor alive determination will be done for this case.
https://github.com/apache/spark/blob/ffe3fc9d23967e41092cf67539aa7f0d77b9eb75/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L456-L460
2. or modify the implementation of `getAllowedLocalityLevel` to allow
farther locality with limited attempts
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]