Ngone51 commented on a change in pull request #28656:
URL: https://github.com/apache/spark/pull/28656#discussion_r431230286
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1107,10 +1107,16 @@ private[spark] class TaskSetManager(
def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
+ val previousLocalityIndex = currentLocalityIndex
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
+ if (currentLocalityIndex > previousLocalityIndex) {
+ // SPARK-31837: there's new higher level locality, so shift to
+ // the highest locality level in terms of better data locality
+ currentLocalityIndex = 0
Review comment:
My only concern of this change is, for general cases, whether the task
set would take more time on delay scheduling and thus reduce the throughput of
the cluster. But considering the improvement that we gain(to increase the
throughput of the cluster) from the new version of the delay scheduling, I
think it should not be a big problem.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]