Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20138#discussion_r161880321
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
         try {
           val newLastScanTime = getNewLastScanTime()
           logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
    -      // scan for modified applications, replay and merge them
    -      val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
    +
    +      val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
             .filter { entry =>
               !entry.isDirectory() &&
                 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
                 // reading a garbage file is safe, but we would log an error 
which can be scary to
                 // the end-user.
                 !entry.getPath().getName().startsWith(".") &&
    -            SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
    -            recordedFileSize(entry.getPath()) < entry.getLen()
    +            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
    +        }
    +        .filter { entry =>
    +          try {
    +            val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
    +            if (info.fileSize < entry.getLen()) {
    +              // Log size has changed, it should be parsed.
    +              true
    +            } else {
    +              // If the SHS view has a valid application, update the time 
the file was last seen so
    +              // that the entry is not deleted from the SHS listing.
    +              if (info.appId.isDefined) {
    +                listing.write(info.copy(lastProcessed = newLastScanTime))
    +              }
    +              false
    +            }
    +          } catch {
    +            case _: NoSuchElementException =>
    +              // If the file is currently not being tracked by the SHS, 
add an entry for it and try
    +              // to parse it. This will allow the cleaner code to detect 
the file as stale later on
    +              // if it was not possible to parse it.
    +              listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
    +                entry.getLen()))
    +              entry.getLen() > 0
    +          }
             }
             .sortWith { case (entry1, entry2) =>
               entry1.getModificationTime() > entry2.getModificationTime()
             }
     
    -      if (logInfos.nonEmpty) {
    -        logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
    +      if (updated.nonEmpty) {
    +        logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
           }
     
    -      var tasks = mutable.ListBuffer[Future[_]]()
    -
    -      try {
    -        for (file <- logInfos) {
    -          tasks += replayExecutor.submit(new Runnable {
    -            override def run(): Unit = mergeApplicationListing(file)
    +      val tasks = updated.map { entry =>
    +        try {
    +          replayExecutor.submit(new Runnable {
    +            override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
               })
    +        } catch {
    +          // let the iteration over logInfos break, since an exception on
    --- End diff --
    
    Actually `RejectedExecutionException` shouldn't ever be thrown here. The 
executor doesn't have a bounded queue, and it's very unlikely you'll ever 
submit `Integer.MAX_VALUE` tasks here.
    
    The code didn't use to catch any exception here (it was added along with 
the comment in a531fe1). Catching the exception doesn't do any harm, I just 
don't think this code will ever trigger.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to