weixiuli commented on a change in pull request #23393: [SPARK-26288][CORE]add 
initRegisteredExecutorsDB
URL: https://github.com/apache/spark/pull/23393#discussion_r245964512
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
 ##########
 @@ -56,11 +60,55 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
 
   private var server: TransportServer = _
 
+  private final val  MAX_DIR_CREATION_ATTEMPTS = 10
+
   private val shuffleServiceSource = new ExternalShuffleServiceSource
 
+  protected def createDirectory(root: String, name: String): File = {
+    var attempts = 0
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+    var dir: File = null
+    while (dir == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException("Failed to create a temp directory (under " + 
root + ") after " +
+          maxAttempts + " attempts!")
+      }
+      try {
+        dir = new File(root, "registeredExecutors")
+        if (!dir.exists() && !dir.mkdirs()) {
+          dir = null
+        }
+      } catch { case e: SecurityException => dir = null; }
+    }
+    logInfo(s"registeredExecutorsDb path is ${dir.getAbsolutePath}")
+    new File(dir.getAbsolutePath, name)
+  }
+
+  protected def initRegisteredExecutorsDB(dbName: String): File = {
+    val localDirs = sparkConf.get("spark.local.dir", "").split(",")
+    if (localDirs.length >= 1 && !"".equals(localDirs(0))) {
+      createDirectory(localDirs(0), dbName)
 
 Review comment:
   Hi, @squito , as you agreed that it certainly should be possible to do it. 
   `the one which comes to mind is, what happens if an application is stopped 
while the external shuffle service is down?  In yarn, we rely on being told the 
application was stopped even after the NM comes back.`
   Now , It  can  leave an entry in the DB forever when some time like above.   
As you said that  Maybe this is rare enough and low-impact enough ,  but at 
least worth thinking through and documenting. So I think  we can add some core 
to remove  the entry  with WorkDirCleanup  when set 
#spark.worker.cleanup.enabled = true in standalone mode. can you have any good 
idea ? 
   
   This commit uses  `localDirs(0)` instead of checking all localDirs to make 
sure it's  a same path to be used by DB  and  make sure 
initRegisteredExecutorsDB  to work , `localDirs(0) ` is just to 
   be used  for DB  instead of  additional set. 
   
   Creating a path like 
"[local-dir]/registeredExecutors/registeredExecutors.ldb" is just to make it 
look clearly .
   
   
   
   
   
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to