pgandhi999 commented on a change in pull request #24035: [SPARK-27112] : Spark 
Scheduler encounters two independent Deadlocks …
URL: https://github.com/apache/spark/pull/24035#discussion_r264797229
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ##########
 @@ -622,67 +633,107 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    * @param countFailures if there are tasks running on the executors when 
they are killed, whether
    *                      those failures be counted to task failure limits?
    * @param force whether to force kill busy executors, default false
+   * @param blacklistingOnTaskCompletion whether the executors are being 
killed due to
+   *                                     blacklisting triggered by the task 
completion event
    * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
    */
   final override def killExecutors(
       executorIds: Seq[String],
       adjustTargetNumExecutors: Boolean,
       countFailures: Boolean,
-      force: Boolean): Seq[String] = {
+      force: Boolean,
+    blacklistingOnTaskCompletion: Boolean): Seq[String] = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
 
-    val response = synchronized {
-      val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
-      unknownExecutors.foreach { id =>
-        logWarning(s"Executor to kill $id does not exist!")
-      }
-
-      // If an executor is already pending to be removed, do not kill it again 
(SPARK-9795)
-      // If this executor is busy, do not kill it unless we are told to force 
kill it (SPARK-9552)
-      val executorsToKill = knownExecutors
-        .filter { id => !executorsPendingToRemove.contains(id) }
-        .filter { id => force || !scheduler.isExecutorBusy(id) }
-      executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!countFailures }
-
-      logInfo(s"Actual list of executor(s) to be killed is 
${executorsToKill.mkString(", ")}")
-
-      // If we do not wish to replace the executors we kill, sync the target 
number of executors
-      // with the cluster manager to avoid allocating new ones. When computing 
the new target,
-      // take into account executors that are pending to be added or removed.
-      val adjustTotalExecutors =
-        if (adjustTargetNumExecutors) {
-          requestedTotalExecutors = math.max(requestedTotalExecutors - 
executorsToKill.size, 0)
-          if (requestedTotalExecutors !=
-              (numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)) {
-            logDebug(
-              s"""killExecutors($executorIds, $adjustTargetNumExecutors, 
$countFailures, $force):
-                 |Executor counts do not match:
-                 |requestedTotalExecutors  = $requestedTotalExecutors
-                 |numExistingExecutors     = $numExistingExecutors
-                 |numPendingExecutors      = $numPendingExecutors
-                 |executorsPendingToRemove = 
${executorsPendingToRemove.size}""".stripMargin)
-          }
-          doRequestTotalExecutors(requestedTotalExecutors)
-        } else {
-          numPendingExecutors += executorsToKill.size
-          Future.successful(true)
+    var response: Future[Seq[String]] = null
+    val idleExecutorIds = executorIds.filter { id => 
!scheduler.isExecutorBusy(id) }
+    if (!blacklistingOnTaskCompletion) {
 
 Review comment:
   @squito I agree with you and @attilapiros about creating an ordering. I 
shall definitely follow the approach and try it out. 
   
   Regarding your comment on the deadlock between `makeOffersLock` and 
task-result-getter thread, that should ideally not happen as the 
task-result-getter thread will never compete for acquiring `makeOffersLock`. 
The reason I have added the flag `blacklistingForTaskCompletion` is to ensure 
that task-result-getter thread never acquires lock on `makeOffersLock`.
   
   Also, you are right in saying that `makeOffersLock` does not solve the 
deadlock. I have explained the purpose of `makeOffersLock` in my comment below. 
Quoting it here:
   
   > I can explain more about the makeOffersLock here.
   > 
   > PR #17091 introduced the part about acquiring synchronization on 
CoarseGrainedSchedulerBackend object in the method makeOffers(). This 
particular piece of code introduced a deadlock between task-result-getter 
thread and dispatcher-event-loop thread. I can simply removed the synchronized 
statement in makeOffers() and the deadlock would be resolved and we really do 
not need makeOffersLock.
   > 
   > However, removing the synchronized statement will once again expose the 
race condition described in JIRA 
https://issues.apache.org/jira/browse/SPARK-19757 for which the fix in the 
corresponding PR was merged. makeOffersLock here serves as the solution to the 
above problem. By synchronizing on makeOffersLock, the race condition between 
dynamic-executor-allocation thread and dispatcher-event-loop thread is avoided. 
That is indeed it's sole purpose. I am however, open to discussing and working 
on better solutions to the above problem, if any. Thank you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to