Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9571#discussion_r67242259
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -278,55 +304,63 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
        * applications that haven't been updated since last time the logs were 
checked.
        */
       private[history] def checkForLogs(): Unit = {
    -    try {
    -      val newLastScanTime = getNewLastScanTime()
    -      logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
    -      val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
    -        .getOrElse(Seq[FileStatus]())
    -      // scan for modified applications, replay and merge them
    -      val logInfos: Seq[FileStatus] = statusList
    -        .filter { entry =>
    -          try {
    -            val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
    -            !entry.isDirectory() && prevFileSize < entry.getLen()
    -          } catch {
    -            case e: AccessControlException =>
    -              // Do not use "logInfo" since these messages can get pretty 
noisy if printed on
    -              // every poll.
    -              logDebug(s"No permission to read $entry, ignoring.")
    -              false
    +    metrics.updateCount.inc()
    +    metrics.updateLastAttempted.touch()
    +    time(metrics.updateTimer) {
    +      try {
    +        val newLastScanTime = getNewLastScanTime()
    +        logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
    +        val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq)
    +          .getOrElse(Seq[FileStatus]())
    +        // scan for modified applications, replay and merge them
    +        val logInfos: Seq[FileStatus] = statusList
    +          .filter { entry =>
    +            try {
    +              val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
    +              !entry.isDirectory() && prevFileSize < entry.getLen()
    +            } catch {
    +              case e: AccessControlException =>
    +                // Do not use "logInfo" since these messages can get 
pretty noisy if printed on
    +                // every poll.
    +                logDebug(s"No permission to read $entry, ignoring.")
    +                false
    +            }
               }
    +          .flatMap { entry => Some(entry) }
    +          .sortWith { case (entry1, entry2) =>
    +            entry1.getModificationTime() >= entry2.getModificationTime()
             }
    -        .flatMap { entry => Some(entry) }
    -        .sortWith { case (entry1, entry2) =>
    -          entry1.getModificationTime() >= entry2.getModificationTime()
    -      }
     
    -      if (logInfos.nonEmpty) {
    -        logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
    -      }
    -      logInfos.map { file =>
    -          replayExecutor.submit(new Runnable {
    -            override def run(): Unit = mergeApplicationListing(file)
    -          })
    +        if (logInfos.nonEmpty) {
    +          logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
             }
    -        .foreach { task =>
    -          try {
    -            // Wait for all tasks to finish. This makes sure that 
checkForLogs
    -            // is not scheduled again while some tasks are already running 
in
    -            // the replayExecutor.
    -            task.get()
    -          } catch {
    -            case e: InterruptedException =>
    -              throw e
    -            case e: Exception =>
    -              logError("Exception while merging application listings", e)
    +        logInfos.map { file =>
    +            replayExecutor.submit(new Runnable {
    +              override def run(): Unit = 
time(metrics.mergeApplicationListingTimer) {
    --- End diff --
    
    I commented later on the event count metric update; but this would be more 
useful if you knew the event count related to it, and if it were more targeted 
(i.e. just the replay, not including the time taken to merge the internal 
lists).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to