ulysses-you commented on code in PR #5141:
URL: https://github.com/apache/kyuubi/pull/5141#discussion_r1286529262


##########
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala:
##########
@@ -192,26 +192,55 @@ case class FinalStageResourceManager(session: 
SparkSession)
       return
     }
 
-    // Note, `SparkContext#killExecutors` does not allow with DRA enabled,
-    // see `https://github.com/apache/spark/pull/20604`.
-    // It may cause the status in `ExecutorAllocationManager` inconsistent with
-    // `CoarseGrainedSchedulerBackend` for a while. But it should be 
synchronous finally.
-    //
-    // We should adjust target num executors, otherwise `YarnAllocator` might 
re-request original
-    // target executors if DRA has not updated target executors yet.
-    // Note, DRA would re-adjust executors if there are more tasks to be 
executed, so we are safe.
-    //
-    //  * We kill executor
-    //      * YarnAllocator re-request target executors
-    //         * DRA can not release executors since they are new added
-    // ----------------------------------------------------------------> 
timeline
+    val startTime = System.currentTimeMillis()
+    val deadline = startTime +
+      conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_MAX_WAIT_TIME_BEFORE_KILL)
+    var draTargetExecutors =
+      getDraTargetExecutors(sc, executorAllocationClient).getOrElse(0)
+    while (System.currentTimeMillis() < deadline && draTargetExecutors > 
targetExecutors) {
+        Thread.sleep(100)
+        draTargetExecutors =
+            getDraTargetExecutors(sc, executorAllocationClient).getOrElse(0)
+    }
+    logInfo(s"DRA target executor number changed to $draTargetExecutors in " +
+      s"${System.currentTimeMillis() - startTime} ms.")
+
+    if (draTargetExecutors <= targetExecutors) {
+      // Ensure target executor number has been updated in cluster manager 
client
+      executorAllocationClient.requestExecutors(0)
+    }
+
     executorAllocationClient.killExecutors(
       executorIds = executorsToKill,
-      adjustTargetNumExecutors = true,
+      adjustTargetNumExecutors = false,
       countFailures = false,
       force = false)

Review Comment:
   Shall we call `client.requestTotalExecutors` to adjust the target executor 
if `targetExecutors` < `draTargetExecutors` after kill executors ?
   
   We might face a issue similar with https://github.com/apache/spark/pull/19048



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to