attilapiros commented on a change in pull request #35881:
URL: https://github.com/apache/spark/pull/35881#discussion_r829081689
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
##########
@@ -271,8 +282,128 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
}
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 3)
+ infos.foreach { info =>
+ assert(info.requestTime.get > 0,
+ "Exec allocation and request times don't make sense")
+ assert(info.requestTime.get > testStartTime,
+ "Exec allocation and request times don't make sense")
+ assert(info.registrationTime.get > info.requestTime.get,
+ "Exec allocation and request times don't make sense")
+ }
}
+ test("exec alloc decrease.") {
+
+ val testStartTime = System.currentTimeMillis()
+
+ val execCores = 3
+ val conf = new SparkConf()
+ .set(EXECUTOR_CORES, execCores)
+ .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive
during test
+ .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor
registrations
+ .setMaster(
+
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
+ .setAppName("test")
+ conf.set(TASK_GPU_ID.amountConf, "1")
+ conf.set(EXECUTOR_GPU_ID.amountConf, "1")
+
+ sc = new SparkContext(conf)
+ val execGpu = new ExecutorResourceRequests().cores(1).resource(GPU, 3)
+ val taskGpu = new TaskResourceRequests().cpus(1).resource(GPU, 1)
+ val rp = new ResourceProfile(execGpu.requests, taskGpu.requests)
+ sc.resourceProfileManager.addResourceProfile(rp)
+ assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ val backend =
sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
+ // Note we get two in default profile and one in the new rp
+ // we need to put a req time in for all of them.
+ backend.requestTotalExecutors(Map((rp.id, 1)), Map(), Map())
+ // Decrease the number of execs requested in the new rp.
+ backend.requestTotalExecutors(Map((rp.id, 0)), Map(), Map())
+ // Request execs in the default profile.
+ backend.requestExecutors(3)
+ val mockEndpointRef = mock[RpcEndpointRef]
+ val mockAddress = mock[RpcAddress]
+ when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) =>
{})
+
+ val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1",
"3")))
+
+ var executorAddedCount: Int = 0
+ val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
+ val listener = new SparkListener() {
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded):
Unit = {
+ // Lets check that the exec allocation times "make sense"
+ val info = executorAdded.executorInfo
+ infos += info
+ executorAddedCount += 1
+ }
+ }
+
+ sc.addSparkListener(listener)
+
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty,
Map.empty, resources,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty,
Map.empty, resources,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty,
Map.empty, resources,
+ rp.id))
+
+ val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
+ val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
+ val buffer = new SerializableBuffer(bytebuffer)
+
+ var execResources = backend.getExecutorAvailableResources("1")
+ assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
+
+ val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
+ assert(exec3ResourceProfileId === rp.id)
+
+ val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
+ val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1,
0, "1",
+ "t1", 0, 1, mutable.Map.empty[String, Long],
+ mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
+ new Properties(), 1, taskResources, bytebuffer)))
+ val ts = backend.getTaskSchedulerImpl()
+ when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]],
any[Boolean])).thenReturn(taskDescs)
+
+ backend.driverEndpoint.send(ReviveOffers)
+
+ eventually(timeout(5 seconds)) {
+ execResources = backend.getExecutorAvailableResources("1")
+ assert(execResources(GPU).availableAddrs.sorted === Array("1", "3"))
+ assert(execResources(GPU).assignedAddrs === Array("0"))
+ }
+
+ // To avoid allocating any resources immediately after releasing the
resource from the task to
+ // make sure that `availableAddrs` below won't change
+ when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]],
any[Boolean])).thenReturn(Seq.empty)
+ backend.driverEndpoint.send(
+ StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources))
+
+ eventually(timeout(5 seconds)) {
+ execResources = backend.getExecutorAvailableResources("1")
+ assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
+ assert(execResources(GPU).assignedAddrs.isEmpty)
+ }
+ sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
+ assert(executorAddedCount === 3)
+ infos.foreach { info =>
+ info.requestTime.map { t =>
+ assert(t > 0,
+ "Exec request times don't make sense")
+ assert(t >= testStartTime,
+ "Exec allocation and request times don't make sense")
+ assert(t >= info.requestTime.get,
+ "Exec allocation and request times don't make sense")
Review comment:
This is always true (I mean both are referencing the same value
as`info.requestTime.get == info.requestTime.get`).
You probably meant to check the `registrationTime` here (as it was done in
the end of the previous test).
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
##########
@@ -271,8 +282,128 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
}
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 3)
+ infos.foreach { info =>
+ assert(info.requestTime.get > 0,
+ "Exec allocation and request times don't make sense")
+ assert(info.requestTime.get > testStartTime,
+ "Exec allocation and request times don't make sense")
+ assert(info.registrationTime.get > info.requestTime.get,
+ "Exec allocation and request times don't make sense")
+ }
}
+ test("exec alloc decrease.") {
Review comment:
Nit:
```suggestion
test("decreasing the number of requested executors") {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]