attilapiros commented on a change in pull request #24035: [SPARK-27112] : Spark
Scheduler encounters two independent Deadlocks …
URL: https://github.com/apache/spark/pull/24035#discussion_r264668141
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -622,67 +633,107 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
* @param countFailures if there are tasks running on the executors when
they are killed, whether
* those failures be counted to task failure limits?
* @param force whether to force kill busy executors, default false
+ * @param blacklistingOnTaskCompletion whether the executors are being
killed due to
+ * blacklisting triggered by the task
completion event
* @return the ids of the executors acknowledged by the cluster manager to
be removed.
*/
final override def killExecutors(
executorIds: Seq[String],
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
- force: Boolean): Seq[String] = {
+ force: Boolean,
+ blacklistingOnTaskCompletion: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
- val response = synchronized {
- val (knownExecutors, unknownExecutors) =
executorIds.partition(executorDataMap.contains)
- unknownExecutors.foreach { id =>
- logWarning(s"Executor to kill $id does not exist!")
- }
-
- // If an executor is already pending to be removed, do not kill it again
(SPARK-9795)
- // If this executor is busy, do not kill it unless we are told to force
kill it (SPARK-9552)
- val executorsToKill = knownExecutors
- .filter { id => !executorsPendingToRemove.contains(id) }
- .filter { id => force || !scheduler.isExecutorBusy(id) }
- executorsToKill.foreach { id => executorsPendingToRemove(id) =
!countFailures }
-
- logInfo(s"Actual list of executor(s) to be killed is
${executorsToKill.mkString(", ")}")
-
- // If we do not wish to replace the executors we kill, sync the target
number of executors
- // with the cluster manager to avoid allocating new ones. When computing
the new target,
- // take into account executors that are pending to be added or removed.
- val adjustTotalExecutors =
- if (adjustTargetNumExecutors) {
- requestedTotalExecutors = math.max(requestedTotalExecutors -
executorsToKill.size, 0)
- if (requestedTotalExecutors !=
- (numExistingExecutors + numPendingExecutors -
executorsPendingToRemove.size)) {
- logDebug(
- s"""killExecutors($executorIds, $adjustTargetNumExecutors,
$countFailures, $force):
- |Executor counts do not match:
- |requestedTotalExecutors = $requestedTotalExecutors
- |numExistingExecutors = $numExistingExecutors
- |numPendingExecutors = $numPendingExecutors
- |executorsPendingToRemove =
${executorsPendingToRemove.size}""".stripMargin)
- }
- doRequestTotalExecutors(requestedTotalExecutors)
- } else {
- numPendingExecutors += executorsToKill.size
- Future.successful(true)
+ var response: Future[Seq[String]] = null
+ val idleExecutorIds = executorIds.filter { id =>
!scheduler.isExecutorBusy(id) }
+ if (!blacklistingOnTaskCompletion) {
Review comment:
Ok I see. I checked the first deadlock and I think the problem is in
`org.apache.spark.scheduler.TaskSchedulerImpl#isExecutorBusy`:
https://github.com/apache/spark/blob/b15423361bc28c4cd2216683eb852fdbec3ea58f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L824-L826
That `synchronised` is too restrictive here for reading a snapshot state of
the `executorIdToRunningTaskIds` map. For this problem a solution could be just
using
[TrieMap](https://www.scala-lang.org/api/current/scala/collection/concurrent/TrieMap.html),
which is "A concurrent hash-trie or TrieMap is a concurrent thread-safe
lock-free implementation of a hash array mapped trie".
If you change the type of `executorIdToRunningTaskIds` from HashMap to
TrieMap then you can remove the synchronised from `isExecutorBusy`.
I have checked and the `isExecutorBusy` is only used from two places:
- org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers where we
already in a synchronised block, so with the type change the behaviour is the
same as before
-
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#killExecutors
where we already lived with a snapshot state which could be outdated after the
method call
Regarding the second deadlock I will continue my analyses.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]