Ngone51 commented on a change in pull request #23377: [SPARK-26439][CORE][WIP] 
Introduce WorkerOffer reservation mechanism for Barrier TaskSet
URL: https://github.com/apache/spark/pull/23377#discussion_r243971256
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##########
 @@ -303,45 +316,136 @@ private[spark] class TaskSchedulerImpl(
       s" ${manager.parent.name}")
   }
 
+  private def addRunningTask(tid: Long, taskSet: TaskSetManager, execId: 
String): Unit = {
+    taskIdToTaskSetManager.put(tid, taskSet)
+    taskIdToExecutorId(tid) = execId
+    executorIdToRunningTaskIds(execId).add(tid)
+  }
+
+  private def getSortedBarrierTaskSets: ArrayBuffer[TaskSetManager] = {
+    rootPool.getSortedTaskSetQueue.filter(_.isBarrier)
+  }
+
+  private def excludeReservedCpus(offers: Seq[WorkerOffer]): Array[Int] = {
+    excludeReservedCpus(offers.toIndexedSeq, getSortedBarrierTaskSets)
+  }
+
+  private def excludeReservedCpus(
+    offers: IndexedSeq[WorkerOffer],
+    barrierTaskSets: ArrayBuffer[TaskSetManager])
+    : Array[Int] = {
+    val execIdToReadyTaskNum = barrierTaskSets.map { ts =>
+      val execIdToTaskNum = ts.getReadyTaskToReservedWorkerOffer.map {
+        case (_, (_, reservedWorkerOffer)) =>
+          (reservedWorkerOffer.execId, 1)
+      }.groupBy { case (execId, _) =>
+        execId
+      }.map { case (execId, taskList) =>
+        val sum = taskList.map {case (_, num) => num }.sum
+        (execId, sum)
+      }
+      execIdToTaskNum
+    }.foldLeft(HashMap[String, Int]()) { case (resMap, execIdToTaskNum) =>
+      execIdToTaskNum.foreach { case (execId, num) =>
+        resMap += (execId -> (num + resMap.getOrElse(execId, 0)))
+      }
+      resMap
+    }
+    offers.map { o =>
+      val reservedCpus = execIdToReadyTaskNum.getOrElse(o.executorId, 0) * 
CPUS_PER_TASK
+      o.cores - reservedCpus
+    }.toArray
+  }
+
+  private def releaseReservedWorkerOfferIfNecessary(alreadyReleased: Int): 
Unit = {
+    val barrierTaskSets = getSortedBarrierTaskSets
+    val needed =
+      barrierTaskSets.map(ts => ts.numTasks - 
ts.getReadyTaskToReservedWorkerOffer.size).sum
+    // take a half by taking waiting time into account roughly
+    val canBeReleased = runningTasksByExecutors.values.sum / 2
+    var extraNeeded = math.max(needed - alreadyReleased - canBeReleased, 0)
+    // start to force release reserved WorkerOffer from the last barrier 
taskSet
+    // in the sorted queue
+    var index = barrierTaskSets.size
+    // no need to release reserved WorkerOffer for the first barrier taskSet
+    while (extraNeeded > 0 && index > 1) {
+      index -= 1
+      val bts = barrierTaskSets(index)
+      val readyTaskNum = bts.getReadyTaskToReservedWorkerOffer.size
+      if (readyTaskNum <= extraNeeded) {
+        bts.releaseReservedWorkerOffer()
+      } else {
+        bts.releaseReservedWorkerOfferByLocality(extraNeeded)
+      }
+      extraNeeded -= readyTaskNum
+    }
+  }
+
   private def resourceOfferSingleTaskSet(
       taskSet: TaskSetManager,
       maxLocality: TaskLocality,
       shuffledOffers: Seq[WorkerOffer],
       availableCpus: Array[Int],
-      tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
-      addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
+      execIdToOfferIndex: Map[String, Int],
+      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
     var launchedTask = false
+    val reviveOffers = new ArrayBuffer[WorkerOffer] ++ shuffledOffers
+    var dynamicAvailableCpus = availableCpus
     // nodes and executors that are blacklisted for the entire application 
have already been
     // filtered out by this point
-    for (i <- 0 until shuffledOffers.size) {
-      val execId = shuffledOffers(i).executorId
-      val host = shuffledOffers(i).host
-      if (availableCpus(i) >= CPUS_PER_TASK) {
-        try {
-          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-            tasks(i) += task
-            val tid = task.taskId
-            taskIdToTaskSetManager.put(tid, taskSet)
-            taskIdToExecutorId(tid) = execId
-            executorIdToRunningTaskIds(execId).add(tid)
-            availableCpus(i) -= CPUS_PER_TASK
-            assert(availableCpus(i) >= 0)
-            // Only update hosts for a barrier task.
-            if (taskSet.isBarrier) {
-              // The executor address is expected to be non empty.
-              addressesWithDescs += (shuffledOffers(i).address.get -> task)
+    while (reviveOffers.nonEmpty) {
+      val addedReviveOffersPerRound = new ArrayBuffer[WorkerOffer]()
+      var replaced = false
+      for (i <- 0 until shuffledOffers.size if 
reviveOffers.contains(shuffledOffers(i))) {
+        val execId = shuffledOffers(i).executorId
+        val host = shuffledOffers(i).host
+        if (dynamicAvailableCpus(i) >= CPUS_PER_TASK) {
+          try {
+            for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
+              dynamicAvailableCpus(i) -= CPUS_PER_TASK
+              launchedTask = true
+              // Only update hosts for a barrier task.
+              if (taskSet.isBarrier) {
+                if (task.executorId != execId) {
+                  replaced = true
+                  val replacedExecId = task.executorId
+                  val offerIndex = execIdToOfferIndex(replacedExecId)
+                  if (offerIndex < i) {
+                    // given the WorkerOffer a second chance to offer the 
resource for the taskSet,
+                    // which reclaims the resource just now
+                    addedReviveOffersPerRound += shuffledOffers(offerIndex)
+                  }
+                }
+                // don't do another resourceOffer round if we've already 
archived the goal, even if
+                // we have new added reviveOffers in this round, which may 
provide better locality
+                // preference, but that's not guaranteed.
+                if (taskSet.getReadyTaskToReservedWorkerOffer.size == 
taskSet.numTasks) {
+                  return launchedTask
 
 Review comment:
   Maybe, we should remove this to perform a better locality scheduler.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to