Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1222#discussion_r21726971
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -217,37 +234,92 @@ private[history] class FsHistoryProvider(conf: 
SparkConf) extends ApplicationHis
             applications = newApps
           }
         } catch {
    -      case t: Throwable => logError("Exception in checking for event log 
updates", t)
    +      case e: Exception => logError("Exception in checking for event log 
updates", e)
         }
       }
     
    -  private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, 
ApplicationEventListener) = {
    -    val path = logDir.getPath()
    -    val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
    -    val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, 
elogInfo.compressionCodec)
    -    val appListener = new ApplicationEventListener
    -    replayBus.addListener(appListener)
    -    (replayBus, appListener)
    +  /**
    +   * Replays the events in the specified log file and returns information 
about the associated
    +   * application.
    +   */
    +  private def replay(eventLog: FileStatus, bus: ReplayListenerBus): 
FsApplicationHistoryInfo = {
    +    val logPath = eventLog.getPath()
    +    val (logInput, sparkVersion) =
    +      if (isLegacyLogDirectory(eventLog)) {
    +        openLegacyEventLog(logPath)
    +      } else {
    +        EventLoggingListener.openEventLog(logPath, fs)
    +      }
    +    try {
    +      val appListener = new ApplicationEventListener
    +      bus.addListener(appListener)
    +      bus.replay(logInput, sparkVersion)
    +      new FsApplicationHistoryInfo(
    +        logPath.getName(),
    +        appListener.appId.getOrElse(logPath.getName()),
    +        appListener.appName.getOrElse(NOT_STARTED),
    +        appListener.startTime.getOrElse(-1L),
    +        appListener.endTime.getOrElse(-1L),
    +        getModificationTime(eventLog),
    +        appListener.sparkUser.getOrElse(NOT_STARTED))
    +    } finally {
    +      logInput.close()
    +    }
       }
     
    -  /** Return when this directory was last modified. */
    -  private def getModificationTime(dir: FileStatus): Long = {
    -    try {
    -      val logFiles = fs.listStatus(dir.getPath)
    -      if (logFiles != null && !logFiles.isEmpty) {
    -        logFiles.map(_.getModificationTime).max
    -      } else {
    -        dir.getModificationTime
    +  /**
    +   * Loads a legacy log directory. This assumes that the log directory 
contains a single event
    +   * log file (along with other metadata files), which is the case for 
directories generated by
    +   * the code in previous releases.
    +   */
    --- End diff --
    
    // Return a 2-tuple of (an input stream of the events, and the version of 
Spark in which the log is written).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to