Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r139349515
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
           appId: String,
           attemptId: Option[String],
           prevFileSize: Long)(): Boolean = {
    -    lookup(appId, attemptId) match {
    -      case None =>
    -        logDebug(s"Application Attempt $appId/$attemptId not found")
    -        false
    -      case Some(latest) =>
    -        prevFileSize < latest.fileSize
    +    try {
    +      val attempt = getAttempt(appId, attemptId)
    +      val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
    +      recordedFileSize(logPath) > prevFileSize
    +    } catch {
    +      case _: NoSuchElementException => false
    +    }
    +  }
    +
    +  /**
    +   * Return the last known size of the given event log, recorded the last 
time the file
    +   * system scanner detected a change in the file.
    +   */
    +  private def recordedFileSize(log: Path): Long = {
    +    try {
    +      listing.read(classOf[LogInfo], log.toString()).fileSize
    +    } catch {
    +      case _: NoSuchElementException => 0L
         }
       }
    +
    +  private def load(appId: String): ApplicationInfoWrapper = {
    +    listing.read(classOf[ApplicationInfoWrapper], appId)
    +  }
    +
    +  /**
    +   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
    +   * where two threads are processing separate attempts of the same 
application.
    +   */
    +  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
    +    val attempt = app.attempts.head
    +
    +    val oldApp = try {
    +      listing.read(classOf[ApplicationInfoWrapper], app.id)
    +    } catch {
    +      case _: NoSuchElementException =>
    +        app
    +    }
    +
    +    def compareAttemptInfo(a1: AttemptInfoWrapper, a2: 
AttemptInfoWrapper): Boolean = {
    +      a1.info.startTime.getTime() > a2.info.startTime.getTime()
    +    }
    +
    +    val attempts = oldApp.attempts.filter(_.info.attemptId != 
attempt.info.attemptId) ++
    +      List(attempt)
    +
    +    val newAppInfo = new ApplicationInfoWrapper(
    +      app.info,
    +      attempts.sortWith(compareAttemptInfo))
    +    listing.write(newAppInfo)
    +  }
    +
    +  /** For testing. Returns internal data about a single attempt. */
    +  private[history] def getAttempt(appId: String, attemptId: 
Option[String]): AttemptInfoWrapper = {
    +    load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse(
    +      throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
    +  }
    +
     }
     
     private[history] object FsHistoryProvider {
    -  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
    -
       private val NOT_STARTED = "<Not Started>"
    --- End diff --
    
    Do we still use this? Why this is not needed now?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to