squito commented on a change in pull request #23393: [SPARK-26288][CORE]add 
initRegisteredExecutorsDB
URL: https://github.com/apache/spark/pull/23393#discussion_r263430434
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
 ##########
 @@ -243,4 +244,59 @@ class WorkerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter {
       ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
     assert(cleanupCalled.get() == value)
   }
+
+  test("removes our metadata of all executors registered for the given " +
+    "application after WorkDirCleanup when config 
spark.shuffle.service.db.enabled=true") {
+    testWorkDirCleanupAndRemoveMetadataWithConfig(true)
+  }
+
+  test("don't removes our metadata of all executors registered for the given " 
+
+    "application after WorkDirCleanup when config 
spark.shuffle.service.db.enabled=false") {
+    testWorkDirCleanupAndRemoveMetadataWithConfig(false)
+  }
+
+  private def testWorkDirCleanupAndRemoveMetadataWithConfig(value: Boolean) = {
+    val conf = new SparkConf().set("spark.shuffle.service.db.enabled", 
value.toString)
+    conf.set("spark.worker.cleanup.appDataTtl", 60.toString)
+    conf.set("spark.shuffle.service.enabled", "true")
+
+    val appId = "app1"
+    val execId = "exec1"
+    val cleanupCalled = new AtomicBoolean(false)
+    when(shuffleService.applicationRemoved(any[String])).thenAnswer(new 
Answer[Unit] {
+      override def answer(invocations: InvocationOnMock): Unit = {
+        cleanupCalled.set(true)
+      }
+    })
+    val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
+      override def get: ExternalShuffleService = shuffleService
+    }
+    val worker = makeWorker(conf, externalShuffleServiceSupplier)
+
+    val workDir = Option("/tmp/work").map(new File(_)).get
+    try {
+      workDir.mkdirs()
+      if (!workDir.exists() || !workDir.isDirectory) {
+        logError("Failed to create work directory " + workDir)
+      }
+      assert(workDir.isDirectory)
+    } catch {
+      case e: Exception =>
+        logError("Failed to create work directory " + workDir, e)
+    }
+    // initialize workers
+    worker.workDir = workDir
+    // Create the executor's working directory
+    val executorDir = new File(worker.workDir, appId + "/" + execId)
+
+    if (!executorDir.exists()) {
+      if (!executorDir.mkdirs()) {
+        throw new IOException("Failed to create directory " + executorDir)
+      }
+    }
+    executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
+    worker.receive(WorkDirCleanup)
+    Thread.sleep(10)
 
 Review comment:
   using sleep() in tests to wait for things isn't great -- it either leads to 
flakiness if you dont' sleep long enough and the test is occasionally slow, or 
if you make it super long, then it makes tests slow.
   
   Ideally there would be a condition variable you could wait on, but that's 
probably not worth it here.  Instead using scalatest's `eventually` works well, 
eg. like this:
   
   
https://github.com/apache/spark/blob/315c95c39953f677220aebc4592ad434019005c0/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala#L279-L282

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to