Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/9571#discussion_r111831406
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -310,77 +338,87 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were
checked.
*/
private[history] def checkForLogs(): Unit = {
- try {
- val newLastScanTime = getNewLastScanTime()
- logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
- val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
- .getOrElse(Seq[FileStatus]())
- // scan for modified applications, replay and merge them
- val logInfos: Seq[FileStatus] = statusList
- .filter { entry =>
- try {
- val prevFileSize =
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
- !entry.isDirectory() &&
- // FsHistoryProvider generates a hidden file which can't be
read. Accidentally
- // reading a garbage file is safe, but we would log an error
which can be scary to
- // the end-user.
- !entry.getPath().getName().startsWith(".") &&
- prevFileSize < entry.getLen()
- } catch {
- case e: AccessControlException =>
- // Do not use "logInfo" since these messages can get pretty
noisy if printed on
- // every poll.
- logDebug(s"No permission to read $entry, ignoring.")
- false
+ metrics.updateCount.inc()
+ metrics.updateLastAttempted.touch()
+ time(metrics.updateTimer) {
+ try {
+ val newLastScanTime = getNewLastScanTime()
+ logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+ val statusList = Option(fs.listStatus(new
Path(logDir))).map(_.toSeq)
+ .getOrElse(Seq[FileStatus]())
+ // scan for modified applications, replay and merge them
+ val logInfos: Seq[FileStatus] = statusList
+ .filter { entry =>
+ try {
+ val prevFileSize =
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+ !entry.isDirectory() &&
+ // FsHistoryProvider generates a hidden file which can't
be read. Accidentally
+ // reading a garbage file is safe, but we would log an
error which can be scary to
+ // the end-user.
+ !entry.getPath().getName().startsWith(".") &&
+ prevFileSize < entry.getLen()
+ } catch {
+ case e: AccessControlException =>
+ // Do not use "logInfo" since these messages can get
pretty noisy if printed on
+ // every poll.
+ logDebug(s"No permission to read $entry, ignoring.")
+ false
+ }
+ }
+ .flatMap { entry => Some(entry) }
+ .sortWith { case (entry1, entry2) =>
+ entry1.getModificationTime() >= entry2.getModificationTime()
}
- }
- .flatMap { entry => Some(entry) }
- .sortWith { case (entry1, entry2) =>
- entry1.getModificationTime() >= entry2.getModificationTime()
- }
if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size}
${logInfos.map(_.getPath)}")
}
- var tasks = mutable.ListBuffer[Future[_]]()
-
- try {
- for (file <- logInfos) {
- tasks += replayExecutor.submit(new Runnable {
- override def run(): Unit = mergeApplicationListing(file)
- })
- }
- } catch {
- // let the iteration over logInfos break, since an exception on
- // replayExecutor.submit (..) indicates the ExecutorService is
unable
- // to take any more submissions at this time
-
- case e: Exception =>
- logError(s"Exception while submitting event log for replay", e)
- }
-
- pendingReplayTasksCount.addAndGet(tasks.size)
+ var tasks = mutable.ListBuffer[Future[_]]()
- tasks.foreach { task =>
try {
- // Wait for all tasks to finish. This makes sure that
checkForLogs
- // is not scheduled again while some tasks are already running in
- // the replayExecutor.
- task.get()
+ for (file <- logInfos) {
+ tasks += replayExecutor.submit(new Runnable {
+ override def run(): Unit =
+ time(metrics.historyMergeTimer,
Some(metrics.historyTotalMergeTime)) {
+ mergeApplicationListing(file)
+ }
+ })
+ }
} catch {
- case e: InterruptedException =>
- throw e
+ // let the iteration over logInfos break, since an exception on
+ // replayExecutor.submit (..) indicates the ExecutorService is
unable
+ // to take any more submissions at this time
+
case e: Exception =>
- logError("Exception while merging application listings", e)
- } finally {
- pendingReplayTasksCount.decrementAndGet()
+ logError(s"Exception while submitting event log for replay", e)
}
- }
- lastScanTime.set(newLastScanTime)
- } catch {
- case e: Exception => logError("Exception in checking for event log
updates", e)
+ pendingReplayTasksCount.addAndGet(tasks.size)
+
+ tasks.foreach { task =>
+ try {
+ // Wait for all tasks to finish. This makes sure that
checkForLogs
+ // is not scheduled again while some tasks are already running
in
+ // the replayExecutor.
+ task.get()
+ } catch {
+ case e: InterruptedException =>
+ throw e
+ case e: Exception =>
+ logError("Exception while merging application listings", e)
+ } finally {
+ pendingReplayTasksCount.decrementAndGet()
+ }
+ }
+
+ lastScanTime.set(newLastScanTime)
+ metrics.updateLastSucceeded.setValue(newLastScanTime)
+ } catch {
+ case e: Exception => logError(
--- End diff --
`logError` goes in the next line.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]