Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/9571#discussion_r67242259
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -278,55 +304,63 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were
checked.
*/
private[history] def checkForLogs(): Unit = {
- try {
- val newLastScanTime = getNewLastScanTime()
- logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
- val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
- .getOrElse(Seq[FileStatus]())
- // scan for modified applications, replay and merge them
- val logInfos: Seq[FileStatus] = statusList
- .filter { entry =>
- try {
- val prevFileSize =
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
- !entry.isDirectory() && prevFileSize < entry.getLen()
- } catch {
- case e: AccessControlException =>
- // Do not use "logInfo" since these messages can get pretty
noisy if printed on
- // every poll.
- logDebug(s"No permission to read $entry, ignoring.")
- false
+ metrics.updateCount.inc()
+ metrics.updateLastAttempted.touch()
+ time(metrics.updateTimer) {
+ try {
+ val newLastScanTime = getNewLastScanTime()
+ logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+ val statusList = Option(fs.listStatus(new
Path(logDir))).map(_.toSeq)
+ .getOrElse(Seq[FileStatus]())
+ // scan for modified applications, replay and merge them
+ val logInfos: Seq[FileStatus] = statusList
+ .filter { entry =>
+ try {
+ val prevFileSize =
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+ !entry.isDirectory() && prevFileSize < entry.getLen()
+ } catch {
+ case e: AccessControlException =>
+ // Do not use "logInfo" since these messages can get
pretty noisy if printed on
+ // every poll.
+ logDebug(s"No permission to read $entry, ignoring.")
+ false
+ }
}
+ .flatMap { entry => Some(entry) }
+ .sortWith { case (entry1, entry2) =>
+ entry1.getModificationTime() >= entry2.getModificationTime()
}
- .flatMap { entry => Some(entry) }
- .sortWith { case (entry1, entry2) =>
- entry1.getModificationTime() >= entry2.getModificationTime()
- }
- if (logInfos.nonEmpty) {
- logDebug(s"New/updated attempts found: ${logInfos.size}
${logInfos.map(_.getPath)}")
- }
- logInfos.map { file =>
- replayExecutor.submit(new Runnable {
- override def run(): Unit = mergeApplicationListing(file)
- })
+ if (logInfos.nonEmpty) {
+ logDebug(s"New/updated attempts found: ${logInfos.size}
${logInfos.map(_.getPath)}")
}
- .foreach { task =>
- try {
- // Wait for all tasks to finish. This makes sure that
checkForLogs
- // is not scheduled again while some tasks are already running
in
- // the replayExecutor.
- task.get()
- } catch {
- case e: InterruptedException =>
- throw e
- case e: Exception =>
- logError("Exception while merging application listings", e)
+ logInfos.map { file =>
+ replayExecutor.submit(new Runnable {
+ override def run(): Unit =
time(metrics.mergeApplicationListingTimer) {
--- End diff --
I commented later on the event count metric update; but this would be more
useful if you knew the event count related to it, and if it were more targeted
(i.e. just the replay, not including the time taken to merge the internal
lists).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]