Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/20138#discussion_r161082423
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
try {
val newLastScanTime = getNewLastScanTime()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
- // scan for modified applications, replay and merge them
- val logInfos = Option(fs.listStatus(new
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+ val updated = Option(fs.listStatus(new
Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry =>
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be
read. Accidentally
// reading a garbage file is safe, but we would log an error
which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
- SparkHadoopUtil.get.checkAccessPermission(entry,
FsAction.READ) &&
- recordedFileSize(entry.getPath()) < entry.getLen()
+ SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+ }
+ .filter { entry =>
+ try {
+ val info = listing.read(classOf[LogInfo],
entry.getPath().toString())
+ if (info.fileSize < entry.getLen()) {
+ // Log size has changed, it should be parsed.
+ true
+ } else {
+ // If the SHS view has a valid application, update the time
the file was last seen so
+ // that the entry is not deleted from the SHS listing.
+ if (info.appId.isDefined) {
+ listing.write(info.copy(lastProcessed = newLastScanTime))
+ }
+ false
+ }
+ } catch {
+ case _: NoSuchElementException =>
+ // If the file is currently not being tracked by the SHS,
add an entry for it and try
+ // to parse it. This will allow the cleaner code to detect
the file as stale later on
+ // if it was not possible to parse it.
+ listing.write(LogInfo(entry.getPath().toString(),
newLastScanTime, None, None,
+ entry.getLen()))
+ entry.getLen() > 0
+ }
}
.sortWith { case (entry1, entry2) =>
entry1.getModificationTime() > entry2.getModificationTime()
}
- if (logInfos.nonEmpty) {
- logDebug(s"New/updated attempts found: ${logInfos.size}
${logInfos.map(_.getPath)}")
+ if (updated.nonEmpty) {
+ logDebug(s"New/updated attempts found: ${updated.size}
${updated.map(_.getPath)}")
}
- var tasks = mutable.ListBuffer[Future[_]]()
-
- try {
- for (file <- logInfos) {
- tasks += replayExecutor.submit(new Runnable {
- override def run(): Unit = mergeApplicationListing(file)
+ val tasks = updated.map { entry =>
+ try {
+ replayExecutor.submit(new Runnable {
+ override def run(): Unit = mergeApplicationListing(entry,
newLastScanTime)
})
+ } catch {
+ // let the iteration over logInfos break, since an exception on
--- End diff --
you've renamed `logInfos` to `updated`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]