Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/18887#discussion_r140689773
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -742,53 +703,150 @@ private[history] object FsHistoryProvider {
private val APPL_END_EVENT_PREFIX =
"{\"Event\":\"SparkListenerApplicationEnd\""
private val LOG_START_EVENT_PREFIX =
"{\"Event\":\"SparkListenerLogStart\""
+
+ /**
+ * Current version of the data written to the listing database. When
opening an existing
+ * db, if the version does not match this value, the FsHistoryProvider
will throw away
+ * all data and re-generate the listing data from the event logs.
+ */
+ private val CURRENT_LISTING_VERSION = 1L
}
/**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is
incomplete.
- * @param lastUpdated the modification time of the log file when this
entry was built by replaying
- * the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has
completed.
- * @param fileSize the size of the log file the last time the file was
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and
uses the same options as
+ * the API serializer.
*/
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+ mapper.registerModule(DefaultScalaModule)
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+ mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+ version: Long,
+ logDir: String)
+
+private[history] case class LogInfo(
+ @KVIndexParam logPath: String,
+ fileSize: Long)
+
+private[history] class AttemptInfoWrapper(
+ val info: v1.ApplicationAttemptInfo,
val logPath: String,
- val name: String,
- val appId: String,
- attemptId: Option[String],
- startTime: Long,
- endTime: Long,
- lastUpdated: Long,
- sparkUser: String,
- completed: Boolean,
- val fileSize: Long,
- appSparkVersion: String)
- extends ApplicationAttemptInfo(
- attemptId, startTime, endTime, lastUpdated, sparkUser, completed,
appSparkVersion) {
-
- /** extend the superclass string value with the extra attributes of this
class */
- override def toString: String = {
- s"FsApplicationAttemptInfo($name, $appId," +
- s" ${super.toString}, source=$logPath, size=$fileSize"
+ val fileSize: Long) {
+
+ def toAppAttemptInfo(): ApplicationAttemptInfo = {
+ ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
+ info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
+ info.completed, info.appSparkVersion)
}
+
}
-/**
- * Application history information
- * @param id application ID
- * @param name application name
- * @param attempts list of attempts, most recent first.
- */
-private class FsApplicationHistoryInfo(
- id: String,
- override val name: String,
- override val attempts: List[FsApplicationAttemptInfo])
- extends ApplicationHistoryInfo(id, name, attempts)
+private[history] class ApplicationInfoWrapper(
+ val info: v1.ApplicationInfo,
+ val attempts: List[AttemptInfoWrapper]) {
+
+ @JsonIgnore @KVIndexParam
+ def id: String = info.id
+
+ @JsonIgnore @KVIndexParam("endTime")
+ def endTime(): Long = attempts.head.info.endTime.getTime()
+
+ @JsonIgnore @KVIndexParam("oldestAttempt")
+ def oldestAttempt(): Long =
attempts.map(_.info.lastUpdated.getTime()).min
+
+ def toAppHistoryInfo(): ApplicationHistoryInfo = {
+ ApplicationHistoryInfo(info.id, info.name,
attempts.map(_.toAppAttemptInfo()))
+ }
+
+ def toApiInfo(): v1.ApplicationInfo = {
+ new v1.ApplicationInfo(info.id, info.name, info.coresGranted,
info.maxCores,
+ info.coresPerExecutor, info.memoryPerExecutorMB,
attempts.map(_.info))
+ }
+
+}
+
+private[history] class AppListingListener(log: FileStatus, clock: Clock)
extends SparkListener {
+
+ private val app = new MutableApplicationInfo()
+ private val attempt = new MutableAttemptInfo(log.getPath().getName(),
log.getLen())
+
+ override def onApplicationStart(event: SparkListenerApplicationStart):
Unit = {
+ app.id = event.appId.orNull
--- End diff --
Do we support reading the log generated by 1.6?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]