Github user ajbozarth commented on a diff in the pull request:
https://github.com/apache/spark/pull/18887#discussion_r132770167
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -422,208 +454,101 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
}
}
- applications.get(appId) match {
- case Some(appInfo) =>
- try {
- // If no attempt is specified, or there is no attemptId for
attempts, return all attempts
- appInfo.attempts.filter { attempt =>
- attempt.attemptId.isEmpty || attemptId.isEmpty ||
attempt.attemptId.get == attemptId.get
- }.foreach { attempt =>
- val logPath = new Path(logDir, attempt.logPath)
- zipFileToStream(logPath, attempt.logPath, zipStream)
- }
- } finally {
- zipStream.close()
+ val app = try {
+ load(appId)
+ } catch {
+ case _: NoSuchElementException =>
+ throw new SparkException(s"Logs for $appId not found.")
+ }
+
+ try {
+ // If no attempt is specified, or there is no attemptId for
attempts, return all attempts
+ attemptId
+ .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
+ .getOrElse(app.attempts)
+ .map(_.logPath)
+ .foreach { log =>
+ zipFileToStream(new Path(logDir, log), log, zipStream)
}
- case None => throw new SparkException(s"Logs for $appId not found.")
+ } finally {
+ zipStream.close()
}
}
/**
- * Replay the log files in the list and merge the list of old
applications with new ones
+ * Replay the given log file, saving the application in the listing db.
*/
protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
- val newAttempts = try {
- val eventsFilter: ReplayEventsFilter = { eventString =>
- eventString.startsWith(APPL_START_EVENT_PREFIX) ||
- eventString.startsWith(APPL_END_EVENT_PREFIX) ||
- eventString.startsWith(LOG_START_EVENT_PREFIX)
- }
-
- val logPath = fileStatus.getPath()
- val appCompleted = isApplicationCompleted(fileStatus)
-
- // Use loading time as lastUpdated since some filesystems don't
update modifiedTime
- // each time file is updated. However use modifiedTime for completed
jobs so lastUpdated
- // won't change whenever HistoryServer restarts and reloads the file.
- val lastUpdated = if (appCompleted) fileStatus.getModificationTime
else clock.getTimeMillis()
-
- val appListener = replay(fileStatus, appCompleted, new
ReplayListenerBus(), eventsFilter)
-
- // Without an app ID, new logs will render incorrectly in the
listing page, so do not list or
- // try to show their UI.
- if (appListener.appId.isDefined) {
- val attemptInfo = new FsApplicationAttemptInfo(
- logPath.getName(),
- appListener.appName.getOrElse(NOT_STARTED),
- appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId,
- appListener.startTime.getOrElse(-1L),
- appListener.endTime.getOrElse(-1L),
- lastUpdated,
- appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted,
- fileStatus.getLen(),
- appListener.appSparkVersion.getOrElse("")
- )
- fileToAppInfo.put(logPath, attemptInfo)
- logDebug(s"Application log ${attemptInfo.logPath} loaded
successfully: $attemptInfo")
- Some(attemptInfo)
- } else {
- logWarning(s"Failed to load application log ${fileStatus.getPath}.
" +
- "The application may have not started.")
- None
- }
-
- } catch {
- case e: Exception =>
- logError(
- s"Exception encountered when attempting to load application log
${fileStatus.getPath}",
- e)
- None
- }
-
- if (newAttempts.isEmpty) {
- return
+ val eventsFilter: ReplayEventsFilter = { eventString =>
+ eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+ eventString.startsWith(APPL_END_EVENT_PREFIX) ||
+ eventString.startsWith(LOG_START_EVENT_PREFIX)
}
- // Build a map containing all apps that contain new attempts. The app
information in this map
- // contains both the new app attempt, and those that were already
loaded in the existing apps
- // map. If an attempt has been updated, it replaces the old attempt in
the list.
- val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
-
- applications.synchronized {
- newAttempts.foreach { attempt =>
- val appInfo = newAppMap.get(attempt.appId)
- .orElse(applications.get(attempt.appId))
- .map { app =>
- val attempts =
- app.attempts.filter(_.attemptId != attempt.attemptId) ++
List(attempt)
- new FsApplicationHistoryInfo(attempt.appId, attempt.name,
- attempts.sortWith(compareAttemptInfo))
- }
- .getOrElse(new FsApplicationHistoryInfo(attempt.appId,
attempt.name, List(attempt)))
- newAppMap(attempt.appId) = appInfo
- }
-
- // Merge the new app list with the existing one, maintaining the
expected ordering (descending
- // end time). Maintaining the order is important to avoid having to
sort the list every time
- // there is a request for the log list.
- val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
- val mergedApps = new mutable.LinkedHashMap[String,
FsApplicationHistoryInfo]()
- def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
- if (!mergedApps.contains(info.id)) {
- mergedApps += (info.id -> info)
- }
- }
+ val logPath = fileStatus.getPath()
+ logInfo(s"Replaying log path: $logPath")
- val newIterator = newApps.iterator.buffered
- val oldIterator = applications.values.iterator.buffered
- while (newIterator.hasNext && oldIterator.hasNext) {
- if (newAppMap.contains(oldIterator.head.id)) {
- oldIterator.next()
- } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
- addIfAbsent(newIterator.next())
- } else {
- addIfAbsent(oldIterator.next())
- }
- }
- newIterator.foreach(addIfAbsent)
- oldIterator.foreach(addIfAbsent)
+ val bus = new ReplayListenerBus()
+ val listener = new AppListingListener(fileStatus, clock)
+ bus.addListener(listener)
- applications = mergedApps
- }
+ replay(fileStatus, isApplicationCompleted(fileStatus), bus,
eventsFilter)
+ listener.applicationInfo.foreach(addListing)
+ listing.write(new LogInfo(logPath.toString(), fileStatus.getLen()))
}
/**
* Delete event logs from the log directory according to the clean
policy defined by the user.
*/
private[history] def cleanLogs(): Unit = {
+ var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
try {
- val maxAge =
conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
-
- val now = clock.getTimeMillis()
- val appsToRetain = new mutable.LinkedHashMap[String,
FsApplicationHistoryInfo]()
-
- def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
- now - attempt.lastUpdated > maxAge
- }
-
- // Scan all logs from the log directory.
- // Only completed applications older than the specified max age will
be deleted.
- applications.values.foreach { app =>
- val (toClean, toRetain) = app.attempts.partition(shouldClean)
- attemptsToClean ++= toClean
-
- if (toClean.isEmpty) {
- appsToRetain += (app.id -> app)
- } else if (toRetain.nonEmpty) {
- appsToRetain += (app.id ->
- new FsApplicationHistoryInfo(app.id, app.name,
toRetain.toList))
+ val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
+
+ // Iterate descending over all applications whose oldest attempt is
older than the maxAge.
--- End diff --
`maxAge` -> `maxTime`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]