ivoson commented on code in PR #39410:
URL: https://github.com/apache/spark/pull/39410#discussion_r1064167770
##########
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala:
##########
@@ -403,6 +405,92 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
"Our unexpected executor does not have a request time.")
}
+ test("SPARK-41848: executor core decrease should base on taskCpus") {
+ val testStartTime = System.currentTimeMillis()
+
+ val execCores = 3
+ val conf = new SparkConf()
+ .set(EXECUTOR_CORES, execCores)
+ .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive
during test
+ .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor
registrations
+ .setMaster(
+
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
+ .setAppName("test")
+
+ sc = new SparkContext(conf)
+
+ val backend =
sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
+ // Request execs in the default profile.
+ backend.requestExecutors(1)
+ val mockEndpointRef = mock[RpcEndpointRef]
+ val mockAddress = mock[RpcAddress]
+ when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) =>
{})
+
+ var executorAddedCount: Int = 0
+ val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
+ val listener = new SparkListener() {
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded):
Unit = {
+ // Lets check that the exec allocation times "make sense"
+ val info = executorAdded.executorInfo
+ infos += info
+ executorAddedCount += 1
+ }
+ }
+
+ sc.addSparkListener(listener)
+
+ val ts = backend.getTaskSchedulerImpl()
+ when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]],
any[Boolean])).thenReturn(Seq.empty)
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("1", mockEndpointRef, mockAddress.host, execCores,
Map.empty, Map.empty,
+ Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+ backend.driverEndpoint.send(LaunchedExecutor("1"))
+ eventually(timeout(5 seconds)) {
+ assert(backend.getExecutorAvailableCpus("1").contains(3))
+ }
+
+ val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
+ val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
+ val buffer = new SerializableBuffer(bytebuffer)
+
+ val defaultRp = ResourceProfile.getOrCreateDefaultProfile(conf)
+ assert(ResourceProfile.getTaskCpusOrDefaultForProfile(defaultRp, conf) ==
1)
+ // Task cpus can be different from default resource profile when
TaskResourceProfile is used.
+ val taskCpus = 2
+ val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1,
0, "1",
+ "t1", 0, 1, mutable.Map.empty[String, Long],
+ mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
+ new Properties(), taskCpus, Map.empty, bytebuffer)))
+ when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]],
any[Boolean])).thenReturn(taskDescs)
+
+ backend.driverEndpoint.send(ReviveOffers)
+
+ eventually(timeout(5 seconds)) {
+ assert(backend.getExecutorAvailableCpus("1").contains(1))
+ }
+
+ // To avoid allocating any resources immediately after releasing the
resource from the task to
+ // make sure that `availableAddrs` below won't change
Review Comment:
Thanks, fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]