squito commented on a change in pull request #23393: [SPARK-26288][CORE]add 
initRegisteredExecutorsDB
URL: https://github.com/apache/spark/pull/23393#discussion_r263429105
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
 ##########
 @@ -243,4 +244,59 @@ class WorkerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter {
       ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
     assert(cleanupCalled.get() == value)
   }
+
+  test("removes our metadata of all executors registered for the given " +
+    "application after WorkDirCleanup when config 
spark.shuffle.service.db.enabled=true") {
+    testWorkDirCleanupAndRemoveMetadataWithConfig(true)
+  }
+
+  test("don't removes our metadata of all executors registered for the given " 
+
+    "application after WorkDirCleanup when config 
spark.shuffle.service.db.enabled=false") {
+    testWorkDirCleanupAndRemoveMetadataWithConfig(false)
+  }
+
+  private def testWorkDirCleanupAndRemoveMetadataWithConfig(value: Boolean) = {
+    val conf = new SparkConf().set("spark.shuffle.service.db.enabled", 
value.toString)
+    conf.set("spark.worker.cleanup.appDataTtl", 60.toString)
+    conf.set("spark.shuffle.service.enabled", "true")
+
+    val appId = "app1"
+    val execId = "exec1"
+    val cleanupCalled = new AtomicBoolean(false)
+    when(shuffleService.applicationRemoved(any[String])).thenAnswer(new 
Answer[Unit] {
+      override def answer(invocations: InvocationOnMock): Unit = {
+        cleanupCalled.set(true)
+      }
+    })
+    val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
+      override def get: ExternalShuffleService = shuffleService
+    }
+    val worker = makeWorker(conf, externalShuffleServiceSupplier)
+
+    val workDir = Option("/tmp/work").map(new File(_)).get
+    try {
+      workDir.mkdirs()
+      if (!workDir.exists() || !workDir.isDirectory) {
+        logError("Failed to create work directory " + workDir)
+      }
+      assert(workDir.isDirectory)
+    } catch {
+      case e: Exception =>
+        logError("Failed to create work directory " + workDir, e)
+    }
+    // initialize workers
+    worker.workDir = workDir
+    // Create the executor's working directory
+    val executorDir = new File(worker.workDir, appId + "/" + execId)
+
+    if (!executorDir.exists()) {
+      if (!executorDir.mkdirs()) {
 
 Review comment:
   you can combine these conditions into one `if`:
   
   ```scala
   if (!executorDir.exists() && !executorDir.mkdirs()) {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to