Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21895#discussion_r207138029
--- Diff:
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +985,38 @@ private[history] object FsHistoryProvider {
private[history] val CURRENT_LISTING_VERSION = 1L
}
+/**
+ * Manages a blacklist containing the files which cannot be read due to
lack of access permissions.
+ */
+private[history] trait LogFilesBlacklisting extends Logging {
+ protected def clock: Clock
+
+ /**
+ * Contains the name of blacklisted files and their insertion time.
+ */
+ private val blacklist = new ConcurrentHashMap[String, Long]
+
+ private[history] def isBlacklisted(path: Path): Boolean = {
+ blacklist.containsKey(path.getName)
+ }
+
+ private[history] def blacklist(path: Path): Unit = {
+ blacklist.put(path.getName, clock.getTimeMillis())
+ }
+
+ /**
+ * Removes expired entries in the blacklist, according to the provided
`expireTimeInSeconds`.
+ */
+ protected def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+ val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds *
1000
+ val expired = new mutable.ArrayBuffer[String]
+ blacklist.asScala.foreach {
--- End diff --
I don't think it is needed as a new collection is build when doing asScala
so we work on a definite snapshot of the original map.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]