shuyouZZ commented on code in PR #38983:
URL: https://github.com/apache/spark/pull/38983#discussion_r1048340238


##########
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala:
##########
@@ -1705,6 +1705,61 @@ abstract class FsHistoryProviderSuite extends 
SparkFunSuite with Matchers with P
     provider.stop()
   }
 
+  test("SPARK-41447: clean up expired event log files that don't exist in 
listing db") {
+    class TestFsHistoryProvider(conf: SparkConf, clock: Clock)
+      extends FsHistoryProvider(conf, clock) {
+      var doMergeApplicationListingCall = 0
+      override private[history] def doMergeApplicationListing(
+          reader: EventLogFileReader,
+          lastSeen: Long,
+          enableSkipToEnd: Boolean,
+          lastCompactionIndex: Option[Long]): Unit = {
+        super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd, 
lastCompactionIndex)
+        doMergeApplicationListingCall += 1
+      }
+    }
+
+    val maxAge = TimeUnit.SECONDS.toMillis(10)
+    val clock = new ManualClock(maxAge / 2)
+    val conf = createTestConf().set(MAX_LOG_AGE_S.key, 
s"${maxAge}ms").set(CLEANER_ENABLED, true)
+    val provider = new TestFsHistoryProvider(conf, clock)
+
+    val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+    writeFile(log1, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", 
Some("attempt1")),
+      SparkListenerApplicationEnd(2L)
+    )
+    log1.setLastModified(0L)
+
+    val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+    writeFile(log2, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", 
Some("attempt2")),
+      SparkListenerApplicationEnd(4L)
+    )
+    log2.setLastModified(clock.getTimeMillis())
+
+    val log3 = newLogFile("app2", Some("attempt1"), inProgress = false)
+    writeFile(log3, None,
+      SparkListenerApplicationStart("app2", Some("app1"), 3L, "test", 
Some("attempt1")),
+      SparkListenerApplicationEnd(4L)
+    )
+    log3.setLastModified(0L)
+
+    provider.getListing().size should be (0)
+
+    // Move the clock forward so log1 and log3 exceed the max age.
+    clock.advance(maxAge)
+    // Avoid unnecessary parse, the expired log files would be cleaned by 
checkForLogs().
+    provider.checkForLogs()
+
+    provider.doMergeApplicationListingCall should be (1)
+    provider.getListing().size should be (1)

Review Comment:
   > This new test code fails here on the master branch, but why it doesn't 
fail at line 1758 ~ 1760? If then, could you explain why this PR claims to 
`clean up expired event log` instead of reducing the number of 
`doMergeApplicationListCall` invocations? I'm wondering if I miss something?
   
   For the first question, checkAndCleanLog will be called to delete expired 
log after parsing if enable clean logs, so the master branch doesn't fail here.
   For the second question, I think you are right, it is better to change it to 
reducing the number of doMergeApplicationListing invocations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to