vanzin closed pull request #20575: [SPARK-23386][DEPLOY] enable direct
application links in SHS before replay
URL: https://github.com/apache/spark/pull/20575
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bf1eeb0c1bf59..40467b5cddf57 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -27,7 +27,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.Source
-import scala.util.Try
+import scala.util.{Success, Try}
import scala.xml.Node
import com.fasterxml.jackson.annotation.JsonIgnore
@@ -401,6 +401,52 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
}
+ val APP_GRP = "appId"
+ val ATTEMPT_GRP = "attemptId"
+ val appIdRegex = """(application_\d+_\d+)_(\d+).*""".r(APP_GRP, ATTEMPT_GRP)
+ private def updateAppList(statuses: Seq[FileStatus]) = {
+ statuses.foreach { fstat =>
+ logInfo(s"Processing file status $fstat")
+ val logPath = fstat.getPath
+ appIdRegex
+ .findAllMatchIn(logPath.getName)
+ .foreach { m =>
+ val appId = m.group(APP_GRP)
+ val attemptId = m.group(ATTEMPT_GRP)
+
+ Try(load(appId)).recover {
+ case e: NoSuchElementException =>
+ logInfo(s"Gernerating application event for $appId from $fstat")
+ val attemptInfo = ApplicationAttemptInfo(
+ Some(attemptId),
+ // timestamp will be refined during replay
+ new Date(fstat.getModificationTime),
+ new Date(fstat.getModificationTime),
+ new Date(fstat.getModificationTime),
+ 1000,
+ fstat.getOwner,
+ false,
+ "2.3.0-tbd-by-replay"
+ )
+ val appInfo = new ApplicationInfoWrapper(
+ new ApplicationInfo(appId, "tbd-by-replay-" + logPath.getName,
+ None, None, None, None, Seq(attemptInfo)),
+ List(new AttemptInfoWrapper(attemptInfo, logPath.toString,
fstat.getLen,
+ None, None, None, None)))
+
+ synchronized {
+ activeUIs.get((appId, Some(attemptId))).foreach { ui =>
+ ui.invalidate()
+ ui.ui.store.close()
+ }
+ }
+ listing.write(appInfo)
+ Success(appInfo)
+ }
+ }
+ }
+ }
+
/**
* Builds the application list based on the current contents of the log
directory.
* Tries to reuse as much of the data already in memory as possible, by not
reading
@@ -460,7 +506,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
if (updated.nonEmpty) {
logDebug(s"New/updated attempts found: ${updated.size}
${updated.map(_.getPath)}")
}
-
+ updateAppList(updated)
val tasks = updated.map { entry =>
try {
replayExecutor.submit(new Runnable {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]