Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/18887#discussion_r141068700
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -722,75 +640,215 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
appId: String,
attemptId: Option[String],
prevFileSize: Long)(): Boolean = {
- lookup(appId, attemptId) match {
- case None =>
+ try {
+ val attempt = getAttempt(appId, attemptId)
+ val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+ recordedFileSize(logPath) > prevFileSize
+ } catch {
+ case _: NoSuchElementException =>
logDebug(s"Application Attempt $appId/$attemptId not found")
false
- case Some(latest) =>
- prevFileSize < latest.fileSize
}
}
-}
-private[history] object FsHistoryProvider {
- val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+ /**
+ * Return the last known size of the given event log, recorded the last
time the file
+ * system scanner detected a change in the file.
+ */
+ private def recordedFileSize(log: Path): Long = {
+ try {
+ listing.read(classOf[LogInfo], log.toString()).fileSize
+ } catch {
+ case _: NoSuchElementException => 0L
+ }
+ }
+
+ private def load(appId: String): ApplicationInfoWrapper = {
+ listing.read(classOf[ApplicationInfoWrapper], appId)
+ }
+
+ /**
+ * Write the app's information to the given store. Serialized to avoid
the (notedly rare) case
+ * where two threads are processing separate attempts of the same
application.
+ */
+ private def addListing(app: ApplicationInfoWrapper): Unit =
listing.synchronized {
+ val attempt = app.attempts.head
+
+ val oldApp = try {
+ listing.read(classOf[ApplicationInfoWrapper], app.id)
+ } catch {
+ case _: NoSuchElementException =>
+ app
+ }
+
+ def compareAttemptInfo(a1: AttemptInfoWrapper, a2:
AttemptInfoWrapper): Boolean = {
+ a1.info.startTime.getTime() > a2.info.startTime.getTime()
+ }
+
+ val attempts = oldApp.attempts.filter(_.info.attemptId !=
attempt.info.attemptId) ++
+ List(attempt)
+
+ val newAppInfo = new ApplicationInfoWrapper(
+ app.info,
+ attempts.sortWith(compareAttemptInfo))
+ listing.write(newAppInfo)
+ }
- private val NOT_STARTED = "<Not Started>"
+ /** For testing. Returns internal data about a single attempt. */
+ private[history] def getAttempt(appId: String, attemptId:
Option[String]): AttemptInfoWrapper = {
+ load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse(
+ throw new NoSuchElementException(s"Cannot find attempt $attemptId of
$appId."))
+ }
+}
+
+private[history] object FsHistoryProvider {
private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS =
"spark.history.fs.numReplayThreads"
private val APPL_START_EVENT_PREFIX =
"{\"Event\":\"SparkListenerApplicationStart\""
private val APPL_END_EVENT_PREFIX =
"{\"Event\":\"SparkListenerApplicationEnd\""
private val LOG_START_EVENT_PREFIX =
"{\"Event\":\"SparkListenerLogStart\""
+
+ /**
+ * Current version of the data written to the listing database. When
opening an existing
+ * db, if the version does not match this value, the FsHistoryProvider
will throw away
+ * all data and re-generate the listing data from the event logs.
+ */
+ private[history] val CURRENT_LISTING_VERSION = 1L
}
/**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is
incomplete.
- * @param lastUpdated the modification time of the log file when this
entry was built by replaying
- * the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has
completed.
- * @param fileSize the size of the log file the last time the file was
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and
uses the same options as
+ * the API serializer.
*/
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+ mapper.registerModule(DefaultScalaModule)
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+ mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+ version: Long,
+ logDir: String)
+
+private[history] case class LogInfo(
+ @KVIndexParam logPath: String,
+ fileSize: Long)
+
+private[history] class AttemptInfoWrapper(
--- End diff --
why this one doesn't have an index?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]