Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21895#discussion_r207136973
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -461,32 +462,37 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
logDebug(s"New/updated attempts found: ${updated.size}
${updated.map(_.getPath)}")
}
- val tasks = updated.map { entry =>
+ val tasks = updated.flatMap { entry =>
try {
- replayExecutor.submit(new Runnable {
+ val task: Future[Unit] = replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry,
newLastScanTime, true)
- })
+ }, Unit)
+ Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an
exception on
// replayExecutor.submit (..) indicates the ExecutorService is
unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
- null
+ None
}
- }.filter(_ != null)
+ }
pendingReplayTasksCount.addAndGet(tasks.size)
// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
- tasks.foreach { task =>
+ tasks.foreach { case (task, path) =>
try {
task.get()
} catch {
case e: InterruptedException =>
throw e
+ case e: ExecutionException if
e.getCause.isInstanceOf[AccessControlException] =>
+ // We don't have read permissions on the log file
+ logDebug(s"Unable to read log $path", e.getCause)
--- End diff --
Sure, will do, thanks
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]