This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c8e7eb1 [SPARK-26774][CORE] Update some docs on TaskSchedulerImpl. c8e7eb1 is described below commit c8e7eb1fa7b504ececfb36aa48860762dc747351 Author: Imran Rashid <iras...@cloudera.com> AuthorDate: Thu Feb 28 11:30:20 2019 -0800 [SPARK-26774][CORE] Update some docs on TaskSchedulerImpl. A couple of places in TaskSchedulerImpl could use a minor doc update on threading concerns. There is one bug fix here, but only in sc.killTaskAttempt() which is probably not used much. Closes #23874 from squito/SPARK-26774. Authored-by: Imran Rashid <iras...@cloudera.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d551fb7..3f23bfe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -51,7 +51,12 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} * threads, so it needs locks in public API methods to maintain its state. In addition, some * [[SchedulerBackend]]s synchronize on themselves when they want to send events here, and then * acquire a lock on us, so we need to make sure that we don't try to lock the backend while - * we are holding a lock on ourselves. + * we are holding a lock on ourselves. This class is called from many threads, notably: + * * The DAGScheduler Event Loop + * * The RPCHandler threads, responding to status updates from Executors + * * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accomodate delay + * scheduling + * * task-result-getter threads */ private[spark] class TaskSchedulerImpl( val sc: SparkContext, @@ -89,11 +94,12 @@ private[spark] class TaskSchedulerImpl( val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK) // TaskSetManagers are not thread safe, so any access to one should be synchronized - // on this class. + // on this class. Protected by `this` private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // Protected by `this` private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] + // Protected by `this` val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -254,7 +260,10 @@ private[spark] class TaskSchedulerImpl( } } - override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean = { + override def killTaskAttempt( + taskId: Long, + interruptThread: Boolean, + reason: String): Boolean = synchronized { logInfo(s"Killing task $taskId: $reason") val execId = taskIdToExecutorId.get(taskId) if (execId.isDefined) { @@ -825,9 +834,10 @@ private[spark] class TaskSchedulerImpl( override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() + // exposed for testing private[scheduler] def taskSetManagerForAttempt( stageId: Int, - stageAttemptId: Int): Option[TaskSetManager] = { + stageAttemptId: Int): Option[TaskSetManager] = synchronized { for { attempts <- taskSetsByStageIdAndAttempt.get(stageId) manager <- attempts.get(stageAttemptId) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org