Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r140689640
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -720,75 +634,218 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
           appId: String,
           attemptId: Option[String],
           prevFileSize: Long)(): Boolean = {
    -    lookup(appId, attemptId) match {
    -      case None =>
    -        logDebug(s"Application Attempt $appId/$attemptId not found")
    -        false
    -      case Some(latest) =>
    -        prevFileSize < latest.fileSize
    +    try {
    +      val attempt = getAttempt(appId, attemptId)
    +      val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
    +      recordedFileSize(logPath) > prevFileSize
    +    } catch {
    +      case _: NoSuchElementException => false
         }
       }
    -}
     
    -private[history] object FsHistoryProvider {
    -  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
    +  /**
    +   * Return the last known size of the given event log, recorded the last 
time the file
    +   * system scanner detected a change in the file.
    +   */
    +  private def recordedFileSize(log: Path): Long = {
    +    try {
    +      listing.read(classOf[LogInfo], log.toString()).fileSize
    +    } catch {
    +      case _: NoSuchElementException => 0L
    +    }
    +  }
    +
    +  private def load(appId: String): ApplicationInfoWrapper = {
    +    listing.read(classOf[ApplicationInfoWrapper], appId)
    +  }
     
    -  private val NOT_STARTED = "<Not Started>"
    +  /**
    +   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
    +   * where two threads are processing separate attempts of the same 
application.
    +   */
    +  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
    +    val attempt = app.attempts.head
    +
    +    val oldApp = try {
    +      listing.read(classOf[ApplicationInfoWrapper], app.id)
    +    } catch {
    +      case _: NoSuchElementException =>
    +        app
    +    }
     
    +    def compareAttemptInfo(a1: AttemptInfoWrapper, a2: 
AttemptInfoWrapper): Boolean = {
    +      a1.info.startTime.getTime() > a2.info.startTime.getTime()
    +    }
    +
    +    val attempts = oldApp.attempts.filter(_.info.attemptId != 
attempt.info.attemptId) ++
    +      List(attempt)
    +
    +    val newAppInfo = new ApplicationInfoWrapper(
    +      app.info,
    +      attempts.sortWith(compareAttemptInfo))
    +    listing.write(newAppInfo)
    +  }
    +
    +  /** For testing. Returns internal data about a single attempt. */
    +  private[history] def getAttempt(appId: String, attemptId: 
Option[String]): AttemptInfoWrapper = {
    +    load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse(
    +      throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
    +  }
    +
    +}
    +
    +private[history] object FsHistoryProvider {
       private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
     
       private val APPL_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationStart\""
     
       private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
     
       private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
    +
    +  /**
    +   * Current version of the data written to the listing database. When 
opening an existing
    +   * db, if the version does not match this value, the FsHistoryProvider 
will throw away
    +   * all data and re-generate the listing data from the event logs.
    +   */
    +  private[history] val CURRENT_LISTING_VERSION = 1L
     }
     
     /**
    - * Application attempt information.
    - *
    - * @param logPath path to the log file, or, for a legacy log, its directory
    - * @param name application name
    - * @param appId application ID
    - * @param attemptId optional attempt ID
    - * @param startTime start time (from playback)
    - * @param endTime end time (from playback). -1 if the application is 
incomplete.
    - * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
    - *                    the history.
    - * @param sparkUser user running the application
    - * @param completed flag to indicate whether or not the application has 
completed.
    - * @param fileSize the size of the log file the last time the file was 
scanned for changes
    + * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
    + * the API serializer.
      */
    -private class FsApplicationAttemptInfo(
    +private class KVStoreScalaSerializer extends KVStoreSerializer {
    +
    +  mapper.registerModule(DefaultScalaModule)
    +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
    +  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
    +
    +}
    +
    +private[history] case class KVStoreMetadata(
    +  version: Long,
    +  logDir: String)
    +
    +private[history] case class LogInfo(
    +  @KVIndexParam logPath: String,
    +  fileSize: Long)
    +
    +private[history] class AttemptInfoWrapper(
    +    val info: v1.ApplicationAttemptInfo,
         val logPath: String,
    -    val name: String,
    -    val appId: String,
    -    attemptId: Option[String],
    -    startTime: Long,
    -    endTime: Long,
    -    lastUpdated: Long,
    -    sparkUser: String,
    -    completed: Boolean,
    -    val fileSize: Long,
    -    appSparkVersion: String)
    -  extends ApplicationAttemptInfo(
    -      attemptId, startTime, endTime, lastUpdated, sparkUser, completed, 
appSparkVersion) {
    -
    -  /** extend the superclass string value with the extra attributes of this 
class */
    -  override def toString: String = {
    -    s"FsApplicationAttemptInfo($name, $appId," +
    -      s" ${super.toString}, source=$logPath, size=$fileSize"
    +    val fileSize: Long) {
    +
    +  def toAppAttemptInfo(): ApplicationAttemptInfo = {
    +    ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
    +      info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
    +      info.completed, info.appSparkVersion)
       }
    +
     }
     
    -/**
    - * Application history information
    - * @param id application ID
    - * @param name application name
    - * @param attempts list of attempts, most recent first.
    - */
    -private class FsApplicationHistoryInfo(
    -    id: String,
    -    override val name: String,
    -    override val attempts: List[FsApplicationAttemptInfo])
    -  extends ApplicationHistoryInfo(id, name, attempts)
    +private[history] class ApplicationInfoWrapper(
    +    val info: v1.ApplicationInfo,
    +    val attempts: List[AttemptInfoWrapper]) {
    +
    +  @JsonIgnore @KVIndexParam
    --- End diff --
    
    `@KVIndexParam`? Where are we using this index?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to