Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r141120995
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -722,75 +640,215 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
           appId: String,
           attemptId: Option[String],
           prevFileSize: Long)(): Boolean = {
    -    lookup(appId, attemptId) match {
    -      case None =>
    +    try {
    +      val attempt = getAttempt(appId, attemptId)
    +      val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
    +      recordedFileSize(logPath) > prevFileSize
    +    } catch {
    +      case _: NoSuchElementException =>
             logDebug(s"Application Attempt $appId/$attemptId not found")
             false
    -      case Some(latest) =>
    -        prevFileSize < latest.fileSize
         }
       }
    -}
     
    -private[history] object FsHistoryProvider {
    -  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
    +  /**
    +   * Return the last known size of the given event log, recorded the last 
time the file
    +   * system scanner detected a change in the file.
    +   */
    +  private def recordedFileSize(log: Path): Long = {
    +    try {
    +      listing.read(classOf[LogInfo], log.toString()).fileSize
    +    } catch {
    +      case _: NoSuchElementException => 0L
    +    }
    +  }
    +
    +  private def load(appId: String): ApplicationInfoWrapper = {
    +    listing.read(classOf[ApplicationInfoWrapper], appId)
    +  }
    +
    +  /**
    +   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
    +   * where two threads are processing separate attempts of the same 
application.
    +   */
    +  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
    +    val attempt = app.attempts.head
    +
    +    val oldApp = try {
    +      listing.read(classOf[ApplicationInfoWrapper], app.id)
    +    } catch {
    +      case _: NoSuchElementException =>
    +        app
    +    }
    +
    +    def compareAttemptInfo(a1: AttemptInfoWrapper, a2: 
AttemptInfoWrapper): Boolean = {
    +      a1.info.startTime.getTime() > a2.info.startTime.getTime()
    +    }
    +
    +    val attempts = oldApp.attempts.filter(_.info.attemptId != 
attempt.info.attemptId) ++
    +      List(attempt)
    +
    +    val newAppInfo = new ApplicationInfoWrapper(
    +      app.info,
    +      attempts.sortWith(compareAttemptInfo))
    +    listing.write(newAppInfo)
    +  }
     
    -  private val NOT_STARTED = "<Not Started>"
    +  /** For testing. Returns internal data about a single attempt. */
    +  private[history] def getAttempt(appId: String, attemptId: 
Option[String]): AttemptInfoWrapper = {
    +    load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse(
    +      throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
    +  }
     
    +}
    +
    +private[history] object FsHistoryProvider {
       private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
     
       private val APPL_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationStart\""
     
       private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
     
       private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
    +
    +  /**
    +   * Current version of the data written to the listing database. When 
opening an existing
    +   * db, if the version does not match this value, the FsHistoryProvider 
will throw away
    +   * all data and re-generate the listing data from the event logs.
    +   */
    +  private[history] val CURRENT_LISTING_VERSION = 1L
     }
     
     /**
    - * Application attempt information.
    - *
    - * @param logPath path to the log file, or, for a legacy log, its directory
    - * @param name application name
    - * @param appId application ID
    - * @param attemptId optional attempt ID
    - * @param startTime start time (from playback)
    - * @param endTime end time (from playback). -1 if the application is 
incomplete.
    - * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
    - *                    the history.
    - * @param sparkUser user running the application
    - * @param completed flag to indicate whether or not the application has 
completed.
    - * @param fileSize the size of the log file the last time the file was 
scanned for changes
    + * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
    + * the API serializer.
      */
    -private class FsApplicationAttemptInfo(
    +private class KVStoreScalaSerializer extends KVStoreSerializer {
    +
    +  mapper.registerModule(DefaultScalaModule)
    +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
    +  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
    +
    +}
    +
    +private[history] case class KVStoreMetadata(
    +  version: Long,
    +  logDir: String)
    +
    +private[history] case class LogInfo(
    +  @KVIndexParam logPath: String,
    +  fileSize: Long)
    +
    +private[history] class AttemptInfoWrapper(
    --- End diff --
    
    Because it's not directly written to the store.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to