This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ef2e730662f2 [SPARK-46664][CORE] Improve `Master` to recover quickly 
in case of zero workers and apps
ef2e730662f2 is described below

commit ef2e730662f270e83a87a2395dfbf30a919b25c2
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Wed Jan 10 16:50:51 2024 -0800

    [SPARK-46664][CORE] Improve `Master` to recover quickly in case of zero 
workers and apps
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve `Master` to recover quickly in case of zero workers 
and apps.
    
    ### Why are the changes needed?
    
    This case happens on the initial cluster creation or during cluster 
restarting procedure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44673 from dongjoon-hyun/SPARK-46664.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/deploy/master/Master.scala    | 23 ++++++++++++++-------
 .../apache/spark/deploy/master/MasterSuite.scala   | 24 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c8679c185ad7..696ce05bce48 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -244,12 +244,13 @@ private[deploy] class Master(
       }
       logInfo("I have been elected leader! New state: " + state)
       if (state == RecoveryState.RECOVERING) {
-        beginRecovery(storedApps, storedDrivers, storedWorkers)
-        recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
-          override def run(): Unit = Utils.tryLogNonFatalError {
-            self.send(CompleteRecovery)
-          }
-        }, recoveryTimeoutMs, TimeUnit.MILLISECONDS)
+        if (beginRecovery(storedApps, storedDrivers, storedWorkers)) {
+          recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
+            override def run(): Unit = Utils.tryLogNonFatalError {
+              self.send(CompleteRecovery)
+            }
+          }, recoveryTimeoutMs, TimeUnit.MILLISECONDS)
+        }
       }
 
     case CompleteRecovery => completeRecovery()
@@ -590,7 +591,7 @@ private[deploy] class Master(
   private var recoveryStartTimeNs = 0L
 
   private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: 
Seq[DriverInfo],
-      storedWorkers: Seq[WorkerInfo]): Unit = {
+      storedWorkers: Seq[WorkerInfo]): Boolean = {
     recoveryStartTimeNs = System.nanoTime()
     for (app <- storedApps) {
       logInfo("Trying to recover app: " + app.id)
@@ -619,6 +620,14 @@ private[deploy] class Master(
         case e: Exception => logInfo("Worker " + worker.id + " had exception 
on reconnect")
       }
     }
+
+    // In case of zero workers and apps, we can complete recovery.
+    if (canCompleteRecovery) {
+      completeRecovery()
+      false
+    } else {
+      true
+    }
   }
 
   private def completeRecovery(): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index e15a5db770eb..6490ee3e8241 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -250,6 +250,30 @@ class MasterSuite extends SparkFunSuite
     CustomRecoveryModeFactory.instantiationAttempts should be > 
instantiationAttempts
   }
 
+  test("SPARK-46664: master should recover quickly in case of zero workers and 
apps") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "CUSTOM")
+    conf.set(RECOVERY_MODE_FACTORY, 
classOf[FakeRecoveryModeFactory].getCanonicalName)
+    conf.set(MASTER_REST_SERVER_ENABLED, false)
+
+    var master: Master = null
+    try {
+      master = makeMaster(conf)
+      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        getState(master) should be(RecoveryState.ALIVE)
+      }
+      master.workers.size should be(0)
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+        FakeRecoveryModeFactory.persistentData.clear()
+      }
+    }
+  }
+
   test("master correctly recover the application") {
     val conf = new SparkConf(loadDefaults = false)
     conf.set(RECOVERY_MODE, "CUSTOM")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to