Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/18887#discussion_r139468045
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -720,19 +633,67 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
appId: String,
attemptId: Option[String],
prevFileSize: Long)(): Boolean = {
- lookup(appId, attemptId) match {
- case None =>
- logDebug(s"Application Attempt $appId/$attemptId not found")
- false
- case Some(latest) =>
- prevFileSize < latest.fileSize
+ try {
+ val attempt = getAttempt(appId, attemptId)
+ val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+ recordedFileSize(logPath) > prevFileSize
+ } catch {
+ case _: NoSuchElementException => false
+ }
+ }
+
+ /**
+ * Return the last known size of the given event log, recorded the last
time the file
+ * system scanner detected a change in the file.
+ */
+ private def recordedFileSize(log: Path): Long = {
+ try {
+ listing.read(classOf[LogInfo], log.toString()).fileSize
+ } catch {
+ case _: NoSuchElementException => 0L
}
}
+
+ private def load(appId: String): ApplicationInfoWrapper = {
+ listing.read(classOf[ApplicationInfoWrapper], appId)
+ }
+
+ /**
+ * Write the app's information to the given store. Serialized to avoid
the (notedly rare) case
+ * where two threads are processing separate attempts of the same
application.
+ */
+ private def addListing(app: ApplicationInfoWrapper): Unit =
listing.synchronized {
+ val attempt = app.attempts.head
--- End diff --
Because the listener processes a single event log; one event log == one
attempt.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]