dongjoon-hyun commented on code in PR #38983:
URL: https://github.com/apache/spark/pull/38983#discussion_r1048364205
##########
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala:
##########
@@ -1705,6 +1705,61 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
provider.stop()
}
+ test("SPARK-41447: clean up expired event log files that don't exist in
listing db") {
+ class TestFsHistoryProvider(conf: SparkConf, clock: Clock)
+ extends FsHistoryProvider(conf, clock) {
+ var doMergeApplicationListingCall = 0
+ override private[history] def doMergeApplicationListing(
+ reader: EventLogFileReader,
+ lastSeen: Long,
+ enableSkipToEnd: Boolean,
+ lastCompactionIndex: Option[Long]): Unit = {
+ super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd,
lastCompactionIndex)
+ doMergeApplicationListingCall += 1
+ }
+ }
+
+ val maxAge = TimeUnit.SECONDS.toMillis(10)
+ val clock = new ManualClock(maxAge / 2)
+ val conf = createTestConf().set(MAX_LOG_AGE_S.key,
s"${maxAge}ms").set(CLEANER_ENABLED, true)
+ val provider = new TestFsHistoryProvider(conf, clock)
+
+ val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 1L, "test",
Some("attempt1")),
+ SparkListenerApplicationEnd(2L)
+ )
+ log1.setLastModified(0L)
+
+ val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app1", Some("app1"), 2L, "test",
Some("attempt2")),
+ SparkListenerApplicationEnd(4L)
+ )
+ log2.setLastModified(clock.getTimeMillis())
+
+ val log3 = newLogFile("app2", Some("attempt1"), inProgress = false)
+ writeFile(log3, None,
+ SparkListenerApplicationStart("app2", Some("app1"), 3L, "test",
Some("attempt1")),
+ SparkListenerApplicationEnd(4L)
+ )
+ log3.setLastModified(0L)
+
+ provider.getListing().size should be (0)
+
+ // Move the clock forward so log1 and log3 exceed the max age.
+ clock.advance(maxAge)
+ // Avoid unnecessary parse, the expired log files would be cleaned by
checkForLogs().
+ provider.checkForLogs()
+
+ provider.doMergeApplicationListingCall should be (1)
+ provider.getListing().size should be (1)
Review Comment:
I want to help you by merging this PR. If `deleting expired log` are
processed already in Apache Spark by other logic, please revise the PR title
and description. Then, it will be easier for me to merge this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]