Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/18887#discussion_r137942487
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -422,208 +455,101 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
}
}
- applications.get(appId) match {
- case Some(appInfo) =>
- try {
- // If no attempt is specified, or there is no attemptId for
attempts, return all attempts
- appInfo.attempts.filter { attempt =>
- attempt.attemptId.isEmpty || attemptId.isEmpty ||
attempt.attemptId.get == attemptId.get
- }.foreach { attempt =>
- val logPath = new Path(logDir, attempt.logPath)
- zipFileToStream(logPath, attempt.logPath, zipStream)
- }
- } finally {
- zipStream.close()
+ val app = try {
+ load(appId)
+ } catch {
+ case _: NoSuchElementException =>
+ throw new SparkException(s"Logs for $appId not found.")
+ }
+
+ try {
+ // If no attempt is specified, or there is no attemptId for
attempts, return all attempts
+ attemptId
+ .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
+ .getOrElse(app.attempts)
+ .map(_.logPath)
+ .foreach { log =>
+ zipFileToStream(new Path(logDir, log), log, zipStream)
}
- case None => throw new SparkException(s"Logs for $appId not found.")
+ } finally {
+ zipStream.close()
}
}
/**
- * Replay the log files in the list and merge the list of old
applications with new ones
+ * Replay the given log file, saving the application in the listing db.
*/
protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
- val newAttempts = try {
- val eventsFilter: ReplayEventsFilter = { eventString =>
- eventString.startsWith(APPL_START_EVENT_PREFIX) ||
- eventString.startsWith(APPL_END_EVENT_PREFIX) ||
- eventString.startsWith(LOG_START_EVENT_PREFIX)
- }
-
- val logPath = fileStatus.getPath()
- val appCompleted = isApplicationCompleted(fileStatus)
-
- // Use loading time as lastUpdated since some filesystems don't
update modifiedTime
- // each time file is updated. However use modifiedTime for completed
jobs so lastUpdated
- // won't change whenever HistoryServer restarts and reloads the file.
- val lastUpdated = if (appCompleted) fileStatus.getModificationTime
else clock.getTimeMillis()
--- End diff --
Is this logics still the same after this PR? Do we have any test case to
ensure it?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]