Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6935#discussion_r52202867
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -415,8 +488,59 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
         }
         newIterator.foreach(addIfAbsent)
         oldIterator.foreach(addIfAbsent)
    +    logDebug(s" completed apps=${mergedApps.count(_._2.completed)}")
    +    logDebug(s" incomplete apps=${mergedApps.count(!_._2.completed)}")
    +    mergedApps
    +  }
    +
     
    -    applications = mergedApps
    +  /**
    +   * Build list of incomplete apps that have been updated since they were 
last examined.
    +   *
    +   * After the scan, if there were any updated attempts, [[applications]] 
is updated
    +   * with the new values.
    +   *
    +   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
    +   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
    +   * with the main scan for new applications. That is: it must be in the 
[[checkForLogs()]]
    +   * operation.
    +   * 3. If an attempt's files are no longer present, the existing attempt 
is not considered
    +   * out of date or otherwise modified.
    +   */
    +  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
    +    val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
    +        .filter( e => !e._2.completed)
    +        .flatMap { e =>
    +          // build list of (false, attempt) or (true, attempt') values
    +          e._2.attempts.flatMap { prevInfo: FsApplicationAttemptInfo =>
    +            val path = new Path(logDir, prevInfo.logPath)
    +            try {
    +              val status = fs.getFileStatus(path)
    +              val size = status.getLen
    +              val aS = prevInfo.fileSize
    +              if (size > aS) {
    +                logDebug(s"Attempt ${prevInfo.name}/${prevInfo.appId} size 
=> $size")
    +                Some(new FsApplicationAttemptInfo(prevInfo.logPath, 
prevInfo.name, prevInfo.appId,
    +                  prevInfo.attemptId, prevInfo.startTime, 
prevInfo.endTime, prevInfo.lastUpdated,
    +                  prevInfo.sparkUser, prevInfo.completed, size))
    --- End diff --
    
    this is part which got me thinking about the change the alternate version 
in https://github.com/apache/spark/pull/11118.  I thought it seemed wrong that 
you were using `prevInfo.completed` -- if the app had transitioned to 
completed, woudl't you need to put the new status in?  Well, it turns out you 
don't, because `checkForLogs` / `mergeApplicationListings` will already re-scan 
the existing, in-progress attempts, and update them if necessary (though there 
is a filesize vs. timestamp issue).  And that is ultimately what transitions 
the app to complete status, since this can't do it.
    
    And that led me to think that maybe we should just leverage that scan 
behavior rather than doing something more complicated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to