Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/20138#discussion_r161095561
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
try {
val newLastScanTime = getNewLastScanTime()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
- // scan for modified applications, replay and merge them
- val logInfos = Option(fs.listStatus(new
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+ val updated = Option(fs.listStatus(new
Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry =>
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be
read. Accidentally
// reading a garbage file is safe, but we would log an error
which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
- SparkHadoopUtil.get.checkAccessPermission(entry,
FsAction.READ) &&
- recordedFileSize(entry.getPath()) < entry.getLen()
+ SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+ }
+ .filter { entry =>
+ try {
+ val info = listing.read(classOf[LogInfo],
entry.getPath().toString())
+ if (info.fileSize < entry.getLen()) {
+ // Log size has changed, it should be parsed.
+ true
+ } else {
+ // If the SHS view has a valid application, update the time
the file was last seen so
+ // that the entry is not deleted from the SHS listing.
+ if (info.appId.isDefined) {
+ listing.write(info.copy(lastProcessed = newLastScanTime))
+ }
+ false
+ }
+ } catch {
+ case _: NoSuchElementException =>
+ // If the file is currently not being tracked by the SHS,
add an entry for it and try
+ // to parse it. This will allow the cleaner code to detect
the file as stale later on
+ // if it was not possible to parse it.
+ listing.write(LogInfo(entry.getPath().toString(),
newLastScanTime, None, None,
+ entry.getLen()))
+ entry.getLen() > 0
+ }
}
.sortWith { case (entry1, entry2) =>
entry1.getModificationTime() > entry2.getModificationTime()
}
- if (logInfos.nonEmpty) {
- logDebug(s"New/updated attempts found: ${logInfos.size}
${logInfos.map(_.getPath)}")
+ if (updated.nonEmpty) {
+ logDebug(s"New/updated attempts found: ${updated.size}
${updated.map(_.getPath)}")
}
- var tasks = mutable.ListBuffer[Future[_]]()
-
- try {
- for (file <- logInfos) {
- tasks += replayExecutor.submit(new Runnable {
- override def run(): Unit = mergeApplicationListing(file)
+ val tasks = updated.map { entry =>
+ try {
+ replayExecutor.submit(new Runnable {
+ override def run(): Unit = mergeApplicationListing(entry,
newLastScanTime)
})
+ } catch {
+ // let the iteration over logInfos break, since an exception on
--- End diff --
and actually you've moved the try/catch so this is no longer true, you'll
continue to submit all tasks if one throws an exception. (I guess I'm not
really sure why the old code did it that way ...)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]