Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22504#discussion_r230463888
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -812,18 +821,74 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
.reverse()
.first(maxTime)
.asScala
+ .filter(l => l.logType == LogType.EventLogs)
.toList
stale.foreach { log =>
if (log.appId.isEmpty) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
- deleteLog(new Path(log.logPath))
+ deleteLog(fs, new Path(log.logPath))
listing.delete(classOf[LogInfo], log.logPath)
}
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}
+ /**
+ * Delete driver logs from the configured spark dfs dir that exceed the
configured max age
+ */
+ private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+ val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
+ val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf)
+ val currentTime = clock.getTimeMillis()
+ val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+ val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir))
+ while (logFiles.hasNext()) {
+ val f = logFiles.next()
+ // Do not rely on 'modtime' as it is not updated for all filesystems
when files are written to
+ val deleteFile =
+ try {
+ val info = listing.read(classOf[LogInfo], f.getPath().toString())
+ // Update the lastprocessedtime of file if it's length or
modification time has changed
+ if (info.fileSize < f.getLen() || info.lastProcessed <
f.getModificationTime()) {
+ listing.write(
+ info.copy(lastProcessed = currentTime, fileSize =
f.getLen()))
+ false
+ } else if (info.lastProcessed > maxTime) {
+ false
+ } else {
+ true
+ }
+ } catch {
+ case e: NoSuchElementException =>
+ // For every new driver log file discovered, create a new
entry in listing
+ listing.write(LogInfo(f.getPath().toString(), currentTime,
LogType.DriverLogs, None,
+ None, f.getLen()))
+ false
+ }
+ if (deleteFile) {
+ logInfo(s"Deleting expired driver log for:
${f.getPath().getName()}")
+ listing.delete(classOf[LogInfo], f.getPath().toString())
+ deleteLog(driverLogFs, f.getPath())
+ }
+ }
+
+ // Delete driver log file entries that exceed the configured max age
and
+ // may have been deleted on filesystem externally.
+ val stale = listing.view(classOf[LogInfo])
+ .index("lastProcessed")
+ .reverse()
+ .first(maxTime)
+ .asScala
+ .filter(i => i.logType == LogType.DriverLogs)
--- End diff --
`.filter { i => ... }`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]