Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19770#discussion_r154816346
--- Diff:
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
---
@@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite
with BeforeAndAfter with Matc
freshUI.get.ui.store.job(0)
}
+ /**
+ * Validate aggressive clean up removes incomplete or corrupt history
files that would
+ * otherwise be missed during clean up. Also validate no behavior
change if aggressive
+ * clean up is disabled.
+ */
+ test("SPARK-21571: aggressive clean up removes incomplete history
files") {
+ createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true,
true)
+ createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false,
true, false)
+ createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false,
false, false)
+ createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false,
false, false)
+ }
+
+ /**
+ * Create four test incomplete/corrupt history files and invoke a check
and clean cycle that
+ * passes followed by one occurring after the max age days rentention
window and assert the
+ * expected number of history files remain.
+ * @param maxAgeDays maximum retention in days, used to simulate current
time
+ * @param lastModifiedDaysAgo last modified date for test files relative
to current time
+ * @param aggressiveCleanup aggressive clean up is enabled or not
+ * @param expectEmptyInprogressRemoved expect an empty inprogress file
to be removed
+ * @param expectEmptyCorruptRemoved expect an empty corrupt complete
file to be removed
+ * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress
file to be removed
+ * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt
complete file to be removed
+ */
+ private def createCleanAndCheckIncompleteLogFiles(
+ maxAgeDays: Long,
+ lastModifiedDaysAgo: Long,
+ aggressiveCleanup: Boolean,
+ expectEmptyInprogressRemoved: Boolean,
+ expectEmptyCorruptRemoved: Boolean,
+ expectNonEmptyInprogressRemoved: Boolean,
+ expectNonEmptyCorruptRemoved: Boolean) = {
+ // Set current time as 2 * maximum retention period to allow for
expired history files.
+ val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2,
TimeUnit.DAYS)
+ val clock = new ManualClock(currentTimeMillis)
+
+ val lastModifiedTime = currentTimeMillis -
+ MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS)
+
+ val provider = new FsHistoryProvider(
+ createTestConf()
+ .set("spark.history.fs.cleaner.aggressive",
s"${aggressiveCleanup}")
+ .set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d")
+ .set("spark.testing", "true"),
+ clock) {
+ override def getNewLastScanTime(): Long = clock.getTimeMillis
+ }
+
+ // Create history files
+ // 1. 0-byte size files inprogress and corrupt complete files
+ // 2. >0 byte size files inprogress and corrupt complete files
+
+ try {
+ val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress
= true)
+ logfile1.createNewFile
+ logfile1.setLastModified(lastModifiedTime)
+
+ val logfile2 = newLogFile("emptyCorruptLogFile", None, inProgress =
false)
+ logfile2.createNewFile
+ logfile2.setLastModified(lastModifiedTime)
+
+ // Create an inprogress log file, has only start record.
+ val logfile3 = newLogFile("nonEmptyInprogressLogFile", None,
inProgress = true)
+ writeFile(logfile3, true, None, SparkListenerApplicationStart(
+ "inProgress1", Some("inProgress1"), 3L, "test", Some("attempt1"))
+ )
+ logfile3.setLastModified(lastModifiedTime)
+
+ // Create an incomplete log file, has an end record but no start
record.
+ val logfile4 = newLogFile("nonEmptyCorruptLogFile", None, inProgress
= false)
+ writeFile(logfile4, true, None, SparkListenerApplicationEnd(0))
+ logfile4.setLastModified(lastModifiedTime)
+
+ // Simulate checking logs 1 day after initial creation. This is
necessary because the log
+ // checker will sometimes use the current time in place of last
modified time the first
+ // time it encounters an inprogress file to work around certain file
system inconsistencies.
+ // No history files should clean up in first check and clean pass.
+ clock.setTime(lastModifiedTime + MILLISECONDS.convert(1,
TimeUnit.DAYS))
+ provider.checkForLogs
--- End diff --
Add `()` to method calls (also in other places).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]