guiyanakuang commented on a change in pull request #34743:
URL: https://github.com/apache/spark/pull/34743#discussion_r760877316
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -259,21 +259,29 @@ private[spark] class TaskSetManager(
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new
ArrayBuffer) += index
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new
ArrayBuffer) += index
}
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
logInfo(s"Pending task $index has a cached location at ${e.host}
" +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location
at ${e.host} " +
", but there are no executors alive there.")
}
- case _ =>
+ case _: HostTaskLocation =>
+ val exe = sched.getExecutorsAliveOnHost(loc.host)
+ exe match {
+ case Some(_) =>
+ pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new
ArrayBuffer) += index
+ case _ => logDebug(s"Pending task $index has a location at
${loc.host} " +
+ ", but there are no executors alive there.")
+ }
}
- pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer)
+= index
Review comment:
I'm sorry I may not have expressed the situation I encountered clearly
Please Take a look at this example: It query a hive partition.
`dataset.persist(StorageLevel.MEMORY_AND_DISK())` is an important statement
that makes the task accompanied by a preferred locations.
```java
// The online environment is actually hive partition data imported to tidb,
the code logic can be simplified as follows
SparkSession testApp = SparkSession.builder()
.master("local[*]")
.appName("test app")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> dataset = testApp.sql("select * from default.test where dt =
'20211129'");
dataset.persist(StorageLevel.MEMORY_AND_DISK());
dataset.count();
```
Here is the forHost of my hook runtime
```
forHost=@HashMap[
serialVersionUID=@Long[1],
_loadFactor=@Integer[750],
table=@HashEntry[][
@DefaultEntry[(kv: hdfs-loc-1, ArrayBuffer(2, 1))],
null,
null,
null,
null,
null,
null,
null,
null,
@DefaultEntry[(kv: driver-host, ArrayBuffer())],
null,
null,
null,
null,
@DefaultEntry[(kv: hdfs-loc-2, ArrayBuffer(2, 1))],
@DefaultEntry[(kv: hdfs-loc-3, ArrayBuffer(2, 1))],
],
```
Task 1, 2 preferred location is HostTaskLocation , pointing to `hdfs-loc-1,
hdfs-loc-2, hdfs-loc-3`
But these three hosts are hdfs data blocks where the machine, these three
machines do not exist spark worker, there is no nodeManager.
`getAllowedLocalityLevel` will only return `NODE_LOCAL` in this case. So "task
can never be actually executed".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]