attilapiros commented on a change in pull request #35881:
URL: https://github.com/apache/spark/pull/35881#discussion_r829167205



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -780,15 +805,53 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       (scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
     }
     val response = synchronized {
+      val oldResourceProfileToNumExecutors = 
requestedTotalExecutorsPerResourceProfile.map {
+        case (rp, num) =>
+          (rp.id, num)
+      }.toMap
       this.requestedTotalExecutorsPerResourceProfile.clear()
       this.requestedTotalExecutorsPerResourceProfile ++= 
resourceProfileToNumExecutors
       this.numLocalityAwareTasksPerResourceProfileId = 
numLocalityAwareTasksPerResourceProfileId
       this.rpHostToLocalTaskCount = hostToLocalTaskCount
+      updateExecRequestTimes(oldResourceProfileToNumExecutors, 
resourceProfileIdToNumExecutors)
       doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
     }
     defaultAskTimeout.awaitResult(response)
   }
 
+  private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile: 
Map[Int, Int]): Unit = {
+    newProfile.map {
+      case (k, v) =>
+        val delta = v - oldProfile.getOrElse(k, 0)
+        if (delta != 0) {
+          updateExecRequestTime(k, delta)
+        }
+    }

Review comment:
       Should we process those keys (resource profiles) which is in the 
`oldProfile` but missing from the `newProfile`?
   

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
##########
@@ -271,8 +282,128 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     }
     sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
     assert(executorAddedCount === 3)
+    infos.foreach { info =>
+      assert(info.requestTime.get > 0,
+        "Exec allocation and request times don't make sense")
+      assert(info.requestTime.get > testStartTime,
+        "Exec allocation and request times don't make sense")
+      assert(info.registrationTime.get > info.requestTime.get,
+        "Exec allocation and request times don't make sense")
+    }
   }
 
+  test("exec alloc decrease.") {
+
+    val testStartTime = System.currentTimeMillis()
+
+    val execCores = 3
+    val conf = new SparkConf()
+      .set(EXECUTOR_CORES, execCores)
+      .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive 
during test
+      .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor 
registrations
+      .setMaster(
+      
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
+      .setAppName("test")
+    conf.set(TASK_GPU_ID.amountConf, "1")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "1")
+
+    sc = new SparkContext(conf)
+    val execGpu = new ExecutorResourceRequests().cores(1).resource(GPU, 3)
+    val taskGpu = new TaskResourceRequests().cpus(1).resource(GPU, 1)
+    val rp = new ResourceProfile(execGpu.requests, taskGpu.requests)
+    sc.resourceProfileManager.addResourceProfile(rp)
+    assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val backend = 
sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
+    // Note we get two in default profile and one in the new rp
+    // we need to put a req time in for all of them.
+    backend.requestTotalExecutors(Map((rp.id, 1)), Map(), Map())
+    // Decrease the number of execs requested in the new rp.
+    backend.requestTotalExecutors(Map((rp.id, 0)), Map(), Map())
+    // Request execs in the default profile.
+    backend.requestExecutors(3)
+    val mockEndpointRef = mock[RpcEndpointRef]
+    val mockAddress = mock[RpcAddress]
+    when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => 
{})
+
+    val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", 
"3")))
+
+    var executorAddedCount: Int = 0
+    val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
+    val listener = new SparkListener() {
+      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = {
+        // Lets check that the exec allocation times "make sense"
+        val info = executorAdded.executorInfo
+        infos += info
+        executorAddedCount += 1
+      }
+    }
+
+    sc.addSparkListener(listener)
+
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, 
Map.empty, resources,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, 
Map.empty, resources,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, 
Map.empty, resources,
+        rp.id))
+
+    val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
+    val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
+    val buffer = new SerializableBuffer(bytebuffer)
+
+    var execResources = backend.getExecutorAvailableResources("1")
+    assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
+
+    val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
+    assert(exec3ResourceProfileId === rp.id)
+
+    val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
+    val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 
0, "1",
+      "t1", 0, 1, mutable.Map.empty[String, Long],
+      mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
+      new Properties(), 1, taskResources, bytebuffer)))
+    val ts = backend.getTaskSchedulerImpl()
+    when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], 
any[Boolean])).thenReturn(taskDescs)
+
+    backend.driverEndpoint.send(ReviveOffers)
+
+    eventually(timeout(5 seconds)) {
+      execResources = backend.getExecutorAvailableResources("1")
+      assert(execResources(GPU).availableAddrs.sorted === Array("1", "3"))
+      assert(execResources(GPU).assignedAddrs === Array("0"))
+    }
+
+    // To avoid allocating any resources immediately after releasing the 
resource from the task to
+    // make sure that `availableAddrs` below won't change
+    when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], 
any[Boolean])).thenReturn(Seq.empty)
+    backend.driverEndpoint.send(
+      StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources))
+
+    eventually(timeout(5 seconds)) {
+      execResources = backend.getExecutorAvailableResources("1")
+      assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
+      assert(execResources(GPU).assignedAddrs.isEmpty)
+    }
+    sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
+    assert(executorAddedCount === 3)
+    infos.foreach { info =>
+      info.requestTime.map { t =>
+        assert(t > 0,
+          "Exec request times don't make sense")
+        assert(t >= testStartTime,
+          "Exec allocation and request times don't make sense")
+        assert(t >= info.requestTime.get,
+          "Exec allocation and request times don't make sense")

Review comment:
       This is always true (I mean both are referencing the same value 
as`info.requestTime.get == info.requestTime.get`).
   You probably meant to check the `registrationTime` here (as it was done in 
the end of the previous test).

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
##########
@@ -271,8 +282,128 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     }
     sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
     assert(executorAddedCount === 3)
+    infos.foreach { info =>
+      assert(info.requestTime.get > 0,
+        "Exec allocation and request times don't make sense")
+      assert(info.requestTime.get > testStartTime,
+        "Exec allocation and request times don't make sense")
+      assert(info.registrationTime.get > info.requestTime.get,
+        "Exec allocation and request times don't make sense")
+    }
   }
 
+  test("exec alloc decrease.") {

Review comment:
       Nit:
   ```suggestion
     test("decreasing the number of requested executors") {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to