agrawaldevesh commented on a change in pull request #29468:
URL: https://github.com/apache/spark/pull/29468#discussion_r473172376



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -1062,25 +1062,38 @@ private[spark] class TaskSchedulerImpl(
   }
 
   def getExecutorsAliveOnHost(host: String): Option[Set[String]] = 
synchronized {
-    hostToExecutors.get(host).map(_.toSet)
+    
hostToExecutors.get(host).map(_.filterNot(isExecutorDecommissioned)).map(_.toSet)
   }
 
   def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
-    hostToExecutors.contains(host)
+    hostToExecutors.get(host)
+      .exists(executors => executors.exists(e => !isExecutorDecommissioned(e)))
   }
 
   def hasHostAliveOnRack(rack: String): Boolean = synchronized {
-    hostsByRack.contains(rack)
+    hostsByRack.get(rack)
+      .exists(hosts => hosts.exists(h => !isHostDecommissioned(h)))
   }
 
   def isExecutorAlive(execId: String): Boolean = synchronized {
-    executorIdToRunningTaskIds.contains(execId)
+    executorIdToRunningTaskIds.contains(execId) && 
!isExecutorDecommissioned(execId)
   }
 
   def isExecutorBusy(execId: String): Boolean = synchronized {
     executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
   }
 
+  // exposed for test

Review comment:
       nit: Should these two methods be declared final : We don't really want 
to override them and instead want to override the helper methods it calls.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -1062,25 +1062,38 @@ private[spark] class TaskSchedulerImpl(
   }
 
   def getExecutorsAliveOnHost(host: String): Option[Set[String]] = 
synchronized {
-    hostToExecutors.get(host).map(_.toSet)
+    
hostToExecutors.get(host).map(_.filterNot(isExecutorDecommissioned)).map(_.toSet)
   }
 
   def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
-    hostToExecutors.contains(host)
+    hostToExecutors.get(host)
+      .exists(executors => executors.exists(e => !isExecutorDecommissioned(e)))
   }
 
   def hasHostAliveOnRack(rack: String): Boolean = synchronized {
-    hostsByRack.contains(rack)
+    hostsByRack.get(rack)
+      .exists(hosts => hosts.exists(h => !isHostDecommissioned(h)))
   }
 
   def isExecutorAlive(execId: String): Boolean = synchronized {
-    executorIdToRunningTaskIds.contains(execId)
+    executorIdToRunningTaskIds.contains(execId) && 
!isExecutorDecommissioned(execId)
   }
 
   def isExecutorBusy(execId: String): Boolean = synchronized {
     executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
   }
 
+  // exposed for test
+  protected def isExecutorDecommissioned(execId: String): Boolean =
+    getExecutorDecommissionInfo(execId).nonEmpty || 
isHostDecommissioned(executorIdToHost(execId))

Review comment:
       Please consider removing `isHostDecommissioned` call here: When a host 
is decommissioned, all executors on that host will be marked decommissioned and 
would eventually end up in getExecutorDecommissionInfo(execId).nonEmpty. 
   
   I would really like the TaskSchedulerImpl to not make its own judgements of 
when an executor is decommissioned but rather let that be dictated by the 
CoarseGrainedSchedulerBackend/Cluster manager. Otherwise I am afraid that the 
two can get out of sync. 
   
   I understand the intention but I think it is a bit overzealous -- executors 
that are on decommissioned hosts shall eventually be marked as 
getExecutorDecommissionInfo. 
   
   I tested that your tests still pass even after it is removed.

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -653,6 +652,33 @@ class TaskSetManagerSuite
     assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined)
   }
 
+  test("SPARK-32653: Decommissioned host/executor should not be used to 
calculate locality " +
+    "levels") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc)
+    val backend = mock(classOf[SchedulerBackend])
+    doNothing().when(backend).reviveOffers()
+    sched.initialize(backend)
+
+    val exec0 = "exec0"
+    val exec1 = "exec1"
+    val host0 = "host0"
+    sched.addExecutor(exec0, host0)
+    sched.addExecutor(exec1, host0)
+
+    val taskSet = FakeTask.createTaskSet(2,
+      Seq(TaskLocation(host0, exec0)),
+      Seq(TaskLocation(host0, exec1)))
+    sched.submitTasks(taskSet)
+    val manager = sched.taskSetManagerForAttempt(0, 0).get
+
+    assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY))
+
+    sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", true))

Review comment:
       I think you should also call `sched.executorDecommission(exec1, 
ExecutorDecommissionInfo("test", true))` (ie for exec1), to mimic the 
production behavior which would always decommission all executors on a 
decommissioned host.
   
   In addition, I think we should also test decommissioning when the entire 
host hasn't been taken down ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to