Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22504#discussion_r228671145
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -796,16 +806,57 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
.asScala
.toList
stale.foreach { log =>
- if (log.appId.isEmpty) {
+ if (log.appId.isEmpty &&
+ (!conf.get(DRIVER_LOG_DFS_DIR).isDefined ||
+ !log.logPath.startsWith(new
Path(conf.get(DRIVER_LOG_DFS_DIR).get).toString()))) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
- deleteLog(new Path(log.logPath))
+ deleteLog(fs, new Path(log.logPath))
listing.delete(classOf[LogInfo], log.logPath)
}
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}
+ /**
+ * Delete driver logs from the configured spark dfs dir that exceed the
configured max age
+ */
+ private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+ val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
+ val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf)
+ val currentTime = clock.getTimeMillis()
+ val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+ val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir))
+ while (logFiles.hasNext()) {
--- End diff --
One issue here is that since you're basing this on the file system's
contents, if these files are deleted outside of the SHS then you'll accumulate
`LogInfo` entries for those deleted entries.
The event log cleaner avoids that by basing this logic on the SHS's view of
the file system, although I don't know if that same logic can be applied here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]