Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r133517697
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -742,53 +696,146 @@ private[history] object FsHistoryProvider {
       private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
     
       private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
    +
    +  /** Current version of the data written to the listing database. */
    +  private val CURRENT_LISTING_VERSION = 1L
     }
     
     /**
    - * Application attempt information.
    - *
    - * @param logPath path to the log file, or, for a legacy log, its directory
    - * @param name application name
    - * @param appId application ID
    - * @param attemptId optional attempt ID
    - * @param startTime start time (from playback)
    - * @param endTime end time (from playback). -1 if the application is 
incomplete.
    - * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
    - *                    the history.
    - * @param sparkUser user running the application
    - * @param completed flag to indicate whether or not the application has 
completed.
    - * @param fileSize the size of the log file the last time the file was 
scanned for changes
    + * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
    + * the API serializer.
      */
    -private class FsApplicationAttemptInfo(
    +private class KVStoreScalaSerializer extends KVStoreSerializer {
    +
    +  mapper.registerModule(DefaultScalaModule)
    +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
    +  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
    +
    +}
    +
    +private[history] case class KVStoreMetadata(
    +  val version: Long,
    +  val logDir: String)
    +
    +private[history] case class LogInfo(
    +  @KVIndexParam val logPath: String,
    +  val fileSize: Long)
    +
    +private[history] class AttemptInfoWrapper(
    +    val info: v1.ApplicationAttemptInfo,
         val logPath: String,
    -    val name: String,
    -    val appId: String,
    -    attemptId: Option[String],
    -    startTime: Long,
    -    endTime: Long,
    -    lastUpdated: Long,
    -    sparkUser: String,
    -    completed: Boolean,
    -    val fileSize: Long,
    -    appSparkVersion: String)
    -  extends ApplicationAttemptInfo(
    -      attemptId, startTime, endTime, lastUpdated, sparkUser, completed, 
appSparkVersion) {
    -
    -  /** extend the superclass string value with the extra attributes of this 
class */
    -  override def toString: String = {
    -    s"FsApplicationAttemptInfo($name, $appId," +
    -      s" ${super.toString}, source=$logPath, size=$fileSize"
    +    val fileSize: Long) {
    +
    +  def toAppAttemptInfo(): ApplicationAttemptInfo = {
    +    ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
    +      info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
    +      info.completed, info.appSparkVersion)
       }
    +
     }
     
    -/**
    - * Application history information
    - * @param id application ID
    - * @param name application name
    - * @param attempts list of attempts, most recent first.
    - */
    -private class FsApplicationHistoryInfo(
    -    id: String,
    -    override val name: String,
    -    override val attempts: List[FsApplicationAttemptInfo])
    -  extends ApplicationHistoryInfo(id, name, attempts)
    +private[history] class ApplicationInfoWrapper(
    +    val info: v1.ApplicationInfo,
    +    val attempts: List[AttemptInfoWrapper]) {
    +
    +  @JsonIgnore @KVIndexParam
    +  def id: String = info.id
    +
    +  @JsonIgnore @KVIndexParam("endTime")
    +  def endTime(): Long = attempts.head.info.endTime.getTime()
    +
    +  @JsonIgnore @KVIndexParam("oldestAttempt")
    +  def oldestAttempt(): Long = 
attempts.map(_.info.lastUpdated.getTime()).min
    +
    +  def toAppHistoryInfo(): ApplicationHistoryInfo = {
    +    ApplicationHistoryInfo(info.id, info.name, 
attempts.map(_.toAppAttemptInfo()))
    +  }
    +
    +  def toApiInfo(): v1.ApplicationInfo = {
    +    new v1.ApplicationInfo(info.id, info.name, info.coresGranted, 
info.maxCores,
    +      info.coresPerExecutor, info.memoryPerExecutorMB, 
attempts.map(_.info))
    +  }
    +
    +}
    +
    +private[history] class AppListingListener(log: FileStatus, clock: Clock) 
extends SparkListener {
    +
    +  private val app = new MutableApplicationInfo()
    +  private val attempt = new MutableAttemptInfo(log.getPath().getName(), 
log.getLen())
    +
    +  override def onApplicationStart(event: SparkListenerApplicationStart): 
Unit = {
    +    app.id = event.appId.orNull
    +    app.name = event.appName
    +
    +    attempt.attemptId = event.appAttemptId
    +    attempt.startTime = new Date(event.time)
    +    attempt.lastUpdated = new Date(clock.getTimeMillis())
    +    attempt.sparkUser = event.sparkUser
    +  }
    +
    +  override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit 
= {
    +    attempt.endTime = new Date(event.time)
    +    attempt.lastUpdated = new Date(log.getModificationTime())
    +    attempt.duration = event.time - attempt.startTime.getTime()
    +    attempt.completed = true
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case SparkListenerLogStart(sparkVersion) =>
    +      attempt.appSparkVersion = sparkVersion
    +    case _ =>
    +  }
    +
    +  def applicationInfo: Option[ApplicationInfoWrapper] = {
    +    if (app.id != null) {
    +      Some(app.toView(List(attempt.toView())))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private class MutableApplicationInfo {
    +    var id: String = null
    +    var name: String = null
    +    var coresGranted: Option[Int] = None
    +    var maxCores: Option[Int] = None
    +    var coresPerExecutor: Option[Int] = None
    +    var memoryPerExecutorMB: Option[Int] = None
    +
    +    def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper 
= {
    --- End diff --
    
    looks like this is only ever called with one `AttemptInfoWrapper`, so 
simpler if you remove the `List`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to