Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/18887#discussion_r140828783
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -422,208 +457,101 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
}
}
- applications.get(appId) match {
- case Some(appInfo) =>
- try {
- // If no attempt is specified, or there is no attemptId for
attempts, return all attempts
- appInfo.attempts.filter { attempt =>
- attempt.attemptId.isEmpty || attemptId.isEmpty ||
attempt.attemptId.get == attemptId.get
- }.foreach { attempt =>
- val logPath = new Path(logDir, attempt.logPath)
- zipFileToStream(logPath, attempt.logPath, zipStream)
- }
- } finally {
- zipStream.close()
+ val app = try {
+ load(appId)
+ } catch {
+ case _: NoSuchElementException =>
+ throw new SparkException(s"Logs for $appId not found.")
+ }
+
+ try {
+ // If no attempt is specified, or there is no attemptId for
attempts, return all attempts
+ attemptId
+ .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
+ .getOrElse(app.attempts)
+ .map(_.logPath)
+ .foreach { log =>
+ zipFileToStream(new Path(logDir, log), log, zipStream)
}
- case None => throw new SparkException(s"Logs for $appId not found.")
+ } finally {
+ zipStream.close()
}
}
/**
- * Replay the log files in the list and merge the list of old
applications with new ones
+ * Replay the given log file, saving the application in the listing db.
*/
protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
- val newAttempts = try {
- val eventsFilter: ReplayEventsFilter = { eventString =>
- eventString.startsWith(APPL_START_EVENT_PREFIX) ||
- eventString.startsWith(APPL_END_EVENT_PREFIX) ||
- eventString.startsWith(LOG_START_EVENT_PREFIX)
- }
-
- val logPath = fileStatus.getPath()
- val appCompleted = isApplicationCompleted(fileStatus)
-
- // Use loading time as lastUpdated since some filesystems don't
update modifiedTime
- // each time file is updated. However use modifiedTime for completed
jobs so lastUpdated
- // won't change whenever HistoryServer restarts and reloads the file.
- val lastUpdated = if (appCompleted) fileStatus.getModificationTime
else clock.getTimeMillis()
-
- val appListener = replay(fileStatus, appCompleted, new
ReplayListenerBus(), eventsFilter)
-
- // Without an app ID, new logs will render incorrectly in the
listing page, so do not list or
- // try to show their UI.
- if (appListener.appId.isDefined) {
- val attemptInfo = new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- lastUpdated,
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted,
- fileStatus.getLen(),
- appListener.appSparkVersion.getOrElse("")
- )
- fileToAppInfo.put(logPath, attemptInfo)
- logDebug(s"Application log ${attemptInfo.logPath} loaded
successfully: $attemptInfo")
- Some(attemptInfo)
- } else {
- logWarning(s"Failed to load application log ${fileStatus.getPath}.
" +
- "The application may have not started.")
- None
- }
-
- } catch {
- case e: Exception =>
- logError(
- s"Exception encountered when attempting to load application log
${fileStatus.getPath}",
- e)
- None
+ val eventsFilter: ReplayEventsFilter = { eventString =>
+ eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+ eventString.startsWith(APPL_END_EVENT_PREFIX) ||
+ eventString.startsWith(LOG_START_EVENT_PREFIX)
}
- if (newAttempts.isEmpty) {
- return
- }
-
- // Build a map containing all apps that contain new attempts. The app
information in this map
- // contains both the new app attempt, and those that were already
loaded in the existing apps
- // map. If an attempt has been updated, it replaces the old attempt in
the list.
- val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
-
- applications.synchronized {
- newAttempts.foreach { attempt =>
- val appInfo = newAppMap.get(attempt.appId)
- .orElse(applications.get(attempt.appId))
- .map { app =>
- val attempts =
- app.attempts.filter(_.attemptId != attempt.attemptId) ++
List(attempt)
- new FsApplicationHistoryInfo(attempt.appId, attempt.name,
- attempts.sortWith(compareAttemptInfo))
- }
- .getOrElse(new FsApplicationHistoryInfo(attempt.appId,
attempt.name, List(attempt)))
- newAppMap(attempt.appId) = appInfo
- }
-
- // Merge the new app list with the existing one, maintaining the
expected ordering (descending
- // end time). Maintaining the order is important to avoid having to
sort the list every time
- // there is a request for the log list.
- val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
- val mergedApps = new mutable.LinkedHashMap[String,
FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
- if (!mergedApps.contains(info.id)) {
- mergedApps += (info.id -> info)
- }
- }
+ val logPath = fileStatus.getPath()
+ logInfo(s"Replaying log path: $logPath")
- val newIterator = newApps.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (newAppMap.contains(oldIterator.head.id)) {
- oldIterator.next()
- } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next())
- } else {
- addIfAbsent(oldIterator.next())
- }
- }
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
+ val bus = new ReplayListenerBus()
+ val listener = new AppListingListener(fileStatus, clock)
+ bus.addListener(listener)
- applications = mergedApps
- }
+ replay(fileStatus, isApplicationCompleted(fileStatus), bus,
eventsFilter)
+ listener.applicationInfo.foreach(addListing)
+ listing.write(LogInfo(logPath.toString(), fileStatus.getLen()))
}
/**
* Delete event logs from the log directory according to the clean
policy defined by the user.
*/
private[history] def cleanLogs(): Unit = {
+ var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
try {
- val maxAge =
conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
-
- val now = clock.getTimeMillis()
- val appsToRetain = new mutable.LinkedHashMap[String,
FsApplicationHistoryInfo]()
-
- def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
- now - attempt.lastUpdated > maxAge
- }
-
- // Scan all logs from the log directory.
- // Only completed applications older than the specified max age will
be deleted.
- applications.values.foreach { app =>
- val (toClean, toRetain) = app.attempts.partition(shouldClean)
- attemptsToClean ++= toClean
-
- if (toClean.isEmpty) {
- appsToRetain += (app.id -> app)
- } else if (toRetain.nonEmpty) {
- appsToRetain += (app.id ->
- new FsApplicationHistoryInfo(app.id, app.name,
toRetain.toList))
+ val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
+
+ // Iterate descending over all applications whose oldest attempt
happended before maxTime.
+ iterator = Some(listing.view(classOf[ApplicationInfoWrapper])
+ .index("oldestAttempt")
+ .reverse()
+ .first(maxTime)
+ .closeableIterator())
+
+ iterator.get.asScala.foreach { app =>
+ val (remaining, toDelete) = app.attempts.partition { attempt =>
--- End diff --
Ah, you're talking about attempts, not applications.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]