Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r140830146
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -422,208 +457,101 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
           }
         }
     
    -    applications.get(appId) match {
    -      case Some(appInfo) =>
    -        try {
    -          // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
    -          appInfo.attempts.filter { attempt =>
    -            attempt.attemptId.isEmpty || attemptId.isEmpty || 
attempt.attemptId.get == attemptId.get
    -          }.foreach { attempt =>
    -            val logPath = new Path(logDir, attempt.logPath)
    -            zipFileToStream(logPath, attempt.logPath, zipStream)
    -          }
    -        } finally {
    -          zipStream.close()
    +    val app = try {
    +      load(appId)
    +    } catch {
    +      case _: NoSuchElementException =>
    +        throw new SparkException(s"Logs for $appId not found.")
    +    }
    +
    +    try {
    +      // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
    +      attemptId
    +        .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
    +        .getOrElse(app.attempts)
    +        .map(_.logPath)
    +        .foreach { log =>
    +          zipFileToStream(new Path(logDir, log), log, zipStream)
             }
    -      case None => throw new SparkException(s"Logs for $appId not found.")
    +    } finally {
    +      zipStream.close()
         }
       }
     
       /**
    -   * Replay the log files in the list and merge the list of old 
applications with new ones
    +   * Replay the given log file, saving the application in the listing db.
        */
       protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
    -    val newAttempts = try {
    -      val eventsFilter: ReplayEventsFilter = { eventString =>
    -        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
    -          eventString.startsWith(APPL_END_EVENT_PREFIX) ||
    -          eventString.startsWith(LOG_START_EVENT_PREFIX)
    -      }
    -
    -      val logPath = fileStatus.getPath()
    -      val appCompleted = isApplicationCompleted(fileStatus)
    -
    -      // Use loading time as lastUpdated since some filesystems don't 
update modifiedTime
    -      // each time file is updated. However use modifiedTime for completed 
jobs so lastUpdated
    -      // won't change whenever HistoryServer restarts and reloads the file.
    -      val lastUpdated = if (appCompleted) fileStatus.getModificationTime 
else clock.getTimeMillis()
    -
    -      val appListener = replay(fileStatus, appCompleted, new 
ReplayListenerBus(), eventsFilter)
    -
    -      // Without an app ID, new logs will render incorrectly in the 
listing page, so do not list or
    -      // try to show their UI.
    -      if (appListener.appId.isDefined) {
    -        val attemptInfo = new FsApplicationAttemptInfo(
    -          logPath.getName(),
    -          appListener.appName.getOrElse(NOT_STARTED),
    -          appListener.appId.getOrElse(logPath.getName()),
    -          appListener.appAttemptId,
    -          appListener.startTime.getOrElse(-1L),
    -          appListener.endTime.getOrElse(-1L),
    -          lastUpdated,
    -          appListener.sparkUser.getOrElse(NOT_STARTED),
    -          appCompleted,
    -          fileStatus.getLen(),
    -          appListener.appSparkVersion.getOrElse("")
    -        )
    -        fileToAppInfo.put(logPath, attemptInfo)
    -        logDebug(s"Application log ${attemptInfo.logPath} loaded 
successfully: $attemptInfo")
    -        Some(attemptInfo)
    -      } else {
    -        logWarning(s"Failed to load application log ${fileStatus.getPath}. 
" +
    -          "The application may have not started.")
    -        None
    -      }
    -
    -    } catch {
    -      case e: Exception =>
    -        logError(
    -          s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
    -          e)
    -        None
    +    val eventsFilter: ReplayEventsFilter = { eventString =>
    +      eventString.startsWith(APPL_START_EVENT_PREFIX) ||
    +        eventString.startsWith(APPL_END_EVENT_PREFIX) ||
    +        eventString.startsWith(LOG_START_EVENT_PREFIX)
         }
     
    -    if (newAttempts.isEmpty) {
    -      return
    -    }
    -
    -    // Build a map containing all apps that contain new attempts. The app 
information in this map
    -    // contains both the new app attempt, and those that were already 
loaded in the existing apps
    -    // map. If an attempt has been updated, it replaces the old attempt in 
the list.
    -    val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
    -
    -    applications.synchronized {
    -      newAttempts.foreach { attempt =>
    -        val appInfo = newAppMap.get(attempt.appId)
    -          .orElse(applications.get(attempt.appId))
    -          .map { app =>
    -            val attempts =
    -              app.attempts.filter(_.attemptId != attempt.attemptId) ++ 
List(attempt)
    -            new FsApplicationHistoryInfo(attempt.appId, attempt.name,
    -              attempts.sortWith(compareAttemptInfo))
    -          }
    -          .getOrElse(new FsApplicationHistoryInfo(attempt.appId, 
attempt.name, List(attempt)))
    -        newAppMap(attempt.appId) = appInfo
    -      }
    -
    -      // Merge the new app list with the existing one, maintaining the 
expected ordering (descending
    -      // end time). Maintaining the order is important to avoid having to 
sort the list every time
    -      // there is a request for the log list.
    -      val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
    -      val mergedApps = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
    -      def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
    -        if (!mergedApps.contains(info.id)) {
    -          mergedApps += (info.id -> info)
    -        }
    -      }
    +    val logPath = fileStatus.getPath()
    +    logInfo(s"Replaying log path: $logPath")
     
    -      val newIterator = newApps.iterator.buffered
    -      val oldIterator = applications.values.iterator.buffered
    -      while (newIterator.hasNext && oldIterator.hasNext) {
    -        if (newAppMap.contains(oldIterator.head.id)) {
    -          oldIterator.next()
    -        } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
    -          addIfAbsent(newIterator.next())
    -        } else {
    -          addIfAbsent(oldIterator.next())
    -        }
    -      }
    -      newIterator.foreach(addIfAbsent)
    -      oldIterator.foreach(addIfAbsent)
    +    val bus = new ReplayListenerBus()
    +    val listener = new AppListingListener(fileStatus, clock)
    +    bus.addListener(listener)
     
    -      applications = mergedApps
    -    }
    +    replay(fileStatus, isApplicationCompleted(fileStatus), bus, 
eventsFilter)
    +    listener.applicationInfo.foreach(addListing)
    +    listing.write(LogInfo(logPath.toString(), fileStatus.getLen()))
       }
     
       /**
        * Delete event logs from the log directory according to the clean 
policy defined by the user.
        */
       private[history] def cleanLogs(): Unit = {
    +    var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
         try {
    -      val maxAge = 
conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
    -
    -      val now = clock.getTimeMillis()
    -      val appsToRetain = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
    -
    -      def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
    -        now - attempt.lastUpdated > maxAge
    -      }
    -
    -      // Scan all logs from the log directory.
    -      // Only completed applications older than the specified max age will 
be deleted.
    -      applications.values.foreach { app =>
    -        val (toClean, toRetain) = app.attempts.partition(shouldClean)
    -        attemptsToClean ++= toClean
    -
    -        if (toClean.isEmpty) {
    -          appsToRetain += (app.id -> app)
    -        } else if (toRetain.nonEmpty) {
    -          appsToRetain += (app.id ->
    -            new FsApplicationHistoryInfo(app.id, app.name, 
toRetain.toList))
    +      val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
    +
    +      // Iterate descending over all applications whose oldest attempt 
happended before maxTime.
    +      iterator = Some(listing.view(classOf[ApplicationInfoWrapper])
    +        .index("oldestAttempt")
    +        .reverse()
    +        .first(maxTime)
    +        .closeableIterator())
    +
    +      iterator.get.asScala.foreach { app =>
    +        val (remaining, toDelete) = app.attempts.partition { attempt =>
    +          attempt.info.lastUpdated.getTime() >= maxTime
    +        }
    +        if (remaining.nonEmpty) {
    +          val newApp = new ApplicationInfoWrapper(app.info, remaining)
    +          listing.write(newApp)
    +        } else {
    +          listing.delete(app.getClass(), app.id)
    --- End diff --
    
    Sure. But that doesn't really make it safe (e.g. what if deleting one 
attempt succeeds, but deleting a second one fails? You'd end up with pointers 
to the failed attempt, unless you update the listing after each deletion; and 
if updating the listing failed, you still end up in the same situation.)
    
    Later milestones have code that recovers from those situations, though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to