Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r138490176
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -422,208 +455,101 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
           }
         }
     
    -    applications.get(appId) match {
    -      case Some(appInfo) =>
    -        try {
    -          // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
    -          appInfo.attempts.filter { attempt =>
    -            attempt.attemptId.isEmpty || attemptId.isEmpty || 
attempt.attemptId.get == attemptId.get
    -          }.foreach { attempt =>
    -            val logPath = new Path(logDir, attempt.logPath)
    -            zipFileToStream(logPath, attempt.logPath, zipStream)
    -          }
    -        } finally {
    -          zipStream.close()
    +    val app = try {
    +      load(appId)
    +    } catch {
    +      case _: NoSuchElementException =>
    +        throw new SparkException(s"Logs for $appId not found.")
    +    }
    +
    +    try {
    +      // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
    +      attemptId
    +        .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
    +        .getOrElse(app.attempts)
    +        .map(_.logPath)
    +        .foreach { log =>
    +          zipFileToStream(new Path(logDir, log), log, zipStream)
             }
    -      case None => throw new SparkException(s"Logs for $appId not found.")
    +    } finally {
    +      zipStream.close()
         }
       }
     
       /**
    -   * Replay the log files in the list and merge the list of old 
applications with new ones
    +   * Replay the given log file, saving the application in the listing db.
        */
       protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
    -    val newAttempts = try {
    -      val eventsFilter: ReplayEventsFilter = { eventString =>
    -        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
    -          eventString.startsWith(APPL_END_EVENT_PREFIX) ||
    -          eventString.startsWith(LOG_START_EVENT_PREFIX)
    -      }
    -
    -      val logPath = fileStatus.getPath()
    -      val appCompleted = isApplicationCompleted(fileStatus)
    -
    -      // Use loading time as lastUpdated since some filesystems don't 
update modifiedTime
    -      // each time file is updated. However use modifiedTime for completed 
jobs so lastUpdated
    -      // won't change whenever HistoryServer restarts and reloads the file.
    -      val lastUpdated = if (appCompleted) fileStatus.getModificationTime 
else clock.getTimeMillis()
    --- End diff --
    
    Yes, the behavior is the same. It's now handled in `AppListingListener` 
(`onApplicationStart` and `onApplicationEnd` callbacks).
    
    I don't see an explicit test for this behavior.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to