[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-07 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r208339969
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -29,9 +29,11 @@ import scala.language.postfixOps
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.security.AccessControlException
 import org.json4s.jackson.JsonMethods._
-import org.mockito.Matchers.any
-import org.mockito.Mockito.{mock, spy, verify}
+import org.mockito.ArgumentMatcher
+import org.mockito.Matchers.{any, argThat}
+import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
--- End diff --

@mgaido91 . Is this the only difference (removing the redundant `doReturn`) 
from the original PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21895


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-03 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207641341
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
+// Clean the blacklist from the expired entries.
+clearBlacklist(CLEAN_INTERVAL_S)
--- End diff --

I misread it as MAX_LOG_AGE_S ... CLEAN_INTERVAL_S should be fine here, you 
are right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-03 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207499147
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
+// Clean the blacklist from the expired entries.
+clearBlacklist(CLEAN_INTERVAL_S)
--- End diff --

This is scheduled anyway every CLEAN_INTERVAL_S. So I don't think that 
changing the value here helps. We may define another config for the 
blacklisting expiration, but this seems an overkill to me. I think it is very 
unlikely that a user changes application permissions on this files and when he 
does, he can always restart the SHS. Or we can also decide to clean the 
blacklist every X amount of time. I don't have a strong opinion on which of 
these options is the best honestly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-03 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207493280
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -161,6 +162,29 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 new HistoryServerDiskManager(conf, path, listing, clock)
   }
 
+  private val blacklist = new ConcurrentHashMap[String, Long]
+
+  // Visible for testing
+  private[history] def isBlacklisted(path: Path): Boolean = {
+blacklist.containsKey(path.getName)
+  }
+
+  private def blacklist(path: Path): Unit = {
+blacklist.put(path.getName, clock.getTimeMillis())
+  }
+
+  /**
+   * Removes expired entries in the blacklist, according to the provided 
`expireTimeInSeconds`.
+   */
+  private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 
1000
+val expired = new mutable.ArrayBuffer[String]
+blacklist.asScala.foreach {
+  case (path, creationTime) if creationTime < expiredThreshold => 
expired += path
+}
+expired.foreach(blacklist.remove(_))
--- End diff --

Instead of this, why not simply:
```
blacklist.asScala.retain((_, creationTime) => creationTime >= 
expiredThreshold)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-03 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207493081
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
+// Clean the blacklist from the expired entries.
+clearBlacklist(CLEAN_INTERVAL_S)
--- End diff --

My only concern is that, if there happens to be a transient acl issue when 
initially accessing the file, we will never see it in the application list even 
when acl is fixed : without a SHS restart.
Wondering if the clean interval here could be fraction of CLEAN_INTERVAL_S 
- so that these files have a chance of making it to app list : without much of 
an overhead on NN.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207419217
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -80,8 +80,8 @@ import org.apache.spark.util.kvstore._
  * break. Simple streaming of JSON-formatted events, as is implemented 
today, implicitly
  * maintains this invariant.
  */
-private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
-  extends ApplicationHistoryProvider with Logging {
+private[history] class FsHistoryProvider(conf: SparkConf, protected val 
clock: Clock)
+  extends ApplicationHistoryProvider with LogFilesBlacklisting with 
Logging {
--- End diff --

This seems not so necessary, let's inline this trait.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207146314
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +985,38 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+/**
+ * Manages a blacklist containing the files which cannot be read due to 
lack of access permissions.
+ */
+private[history] trait LogFilesBlacklisting extends Logging {
+  protected def clock: Clock
+
+  /**
+   * Contains the name of blacklisted files and their insertion time.
+   */
+  private val blacklist = new ConcurrentHashMap[String, Long]
+
+  private[history] def isBlacklisted(path: Path): Boolean = {
+blacklist.containsKey(path.getName)
+  }
+
+  private[history] def blacklist(path: Path): Unit = {
+blacklist.put(path.getName, clock.getTimeMillis())
+  }
+
+  /**
+   * Removes expired entries in the blacklist, according to the provided 
`expireTimeInSeconds`.
+   */
+  protected def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 
1000
+val expired = new mutable.ArrayBuffer[String]
+blacklist.asScala.foreach {
--- End diff --

Yes, sorry, you are right, I got confused because I changed this line 
before pushing here and I was thinking to my previous implementation. Ye, we 
are not working on a definite snapshot of the values here. But I think anyway 
this shouldn't be a problem as we are not updating the values. We may miss to 
process new entries but this is not an issue I think. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207140685
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +985,38 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+/**
+ * Manages a blacklist containing the files which cannot be read due to 
lack of access permissions.
+ */
+private[history] trait LogFilesBlacklisting extends Logging {
+  protected def clock: Clock
+
+  /**
+   * Contains the name of blacklisted files and their insertion time.
+   */
+  private val blacklist = new ConcurrentHashMap[String, Long]
+
+  private[history] def isBlacklisted(path: Path): Boolean = {
+blacklist.containsKey(path.getName)
+  }
+
+  private[history] def blacklist(path: Path): Unit = {
+blacklist.put(path.getName, clock.getTimeMillis())
+  }
+
+  /**
+   * Removes expired entries in the blacklist, according to the provided 
`expireTimeInSeconds`.
+   */
+  protected def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 
1000
+val expired = new mutable.ArrayBuffer[String]
+blacklist.asScala.foreach {
--- End diff --

AFAIK, `asScala` doesn't copy and create a snapshot from original map, it 
just wraps the original map and provide Scala API. The change of original map 
will also affect the object after `asScala`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207138029
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +985,38 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+/**
+ * Manages a blacklist containing the files which cannot be read due to 
lack of access permissions.
+ */
+private[history] trait LogFilesBlacklisting extends Logging {
+  protected def clock: Clock
+
+  /**
+   * Contains the name of blacklisted files and their insertion time.
+   */
+  private val blacklist = new ConcurrentHashMap[String, Long]
+
+  private[history] def isBlacklisted(path: Path): Boolean = {
+blacklist.containsKey(path.getName)
+  }
+
+  private[history] def blacklist(path: Path): Unit = {
+blacklist.put(path.getName, clock.getTimeMillis())
+  }
+
+  /**
+   * Removes expired entries in the blacklist, according to the provided 
`expireTimeInSeconds`.
+   */
+  protected def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 
1000
+val expired = new mutable.ArrayBuffer[String]
+blacklist.asScala.foreach {
--- End diff --

I don't think it is needed as a new collection is build when doing asScala 
so we work on a definite snapshot of the original map.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207136973
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -461,32 +462,37 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  val tasks = updated.map { entry =>
+  val tasks = updated.flatMap { entry =>
 try {
-  replayExecutor.submit(new Runnable {
+  val task: Future[Unit] = replayExecutor.submit(new Runnable {
 override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime, true)
-  })
+  }, Unit)
+  Some(task -> entry.getPath)
 } catch {
   // let the iteration over the updated entries break, since an 
exception on
   // replayExecutor.submit (..) indicates the ExecutorService is 
unable
   // to take any more submissions at this time
   case e: Exception =>
 logError(s"Exception while submitting event log for replay", e)
-null
+None
 }
-  }.filter(_ != null)
+  }
 
   pendingReplayTasksCount.addAndGet(tasks.size)
 
   // Wait for all tasks to finish. This makes sure that checkForLogs
   // is not scheduled again while some tasks are already running in
   // the replayExecutor.
-  tasks.foreach { task =>
+  tasks.foreach { case (task, path) =>
 try {
   task.get()
 } catch {
   case e: InterruptedException =>
 throw e
+  case e: ExecutionException if 
e.getCause.isInstanceOf[AccessControlException] =>
+// We don't have read permissions on the log file
+logDebug(s"Unable to read log $path", e.getCause)
--- End diff --

Sure, will do, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207136738
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -80,8 +80,8 @@ import org.apache.spark.util.kvstore._
  * break. Simple streaming of JSON-formatted events, as is implemented 
today, implicitly
  * maintains this invariant.
  */
-private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
-  extends ApplicationHistoryProvider with Logging {
+private[history] class FsHistoryProvider(conf: SparkConf, protected val 
clock: Clock)
+  extends ApplicationHistoryProvider with LogFilesBlacklisting with 
Logging {
--- End diff --

I just wanted to separate the blacklisting logic since `FsHistoryProvider` 
contains already a lot of code. So I just considered it more readable. If you 
prefer I can inline it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207133206
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +985,38 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+/**
+ * Manages a blacklist containing the files which cannot be read due to 
lack of access permissions.
+ */
+private[history] trait LogFilesBlacklisting extends Logging {
+  protected def clock: Clock
+
+  /**
+   * Contains the name of blacklisted files and their insertion time.
+   */
+  private val blacklist = new ConcurrentHashMap[String, Long]
+
+  private[history] def isBlacklisted(path: Path): Boolean = {
+blacklist.containsKey(path.getName)
+  }
+
+  private[history] def blacklist(path: Path): Unit = {
+blacklist.put(path.getName, clock.getTimeMillis())
+  }
+
+  /**
+   * Removes expired entries in the blacklist, according to the provided 
`expireTimeInSeconds`.
+   */
+  protected def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 
1000
+val expired = new mutable.ArrayBuffer[String]
+blacklist.asScala.foreach {
--- End diff --

Ideally the iteration should be synchronized, but I think it is not a big 
deal here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207131160
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -461,32 +462,37 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  val tasks = updated.map { entry =>
+  val tasks = updated.flatMap { entry =>
 try {
-  replayExecutor.submit(new Runnable {
+  val task: Future[Unit] = replayExecutor.submit(new Runnable {
 override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime, true)
-  })
+  }, Unit)
+  Some(task -> entry.getPath)
 } catch {
   // let the iteration over the updated entries break, since an 
exception on
   // replayExecutor.submit (..) indicates the ExecutorService is 
unable
   // to take any more submissions at this time
   case e: Exception =>
 logError(s"Exception while submitting event log for replay", e)
-null
+None
 }
-  }.filter(_ != null)
+  }
 
   pendingReplayTasksCount.addAndGet(tasks.size)
 
   // Wait for all tasks to finish. This makes sure that checkForLogs
   // is not scheduled again while some tasks are already running in
   // the replayExecutor.
-  tasks.foreach { task =>
+  tasks.foreach { case (task, path) =>
 try {
   task.get()
 } catch {
   case e: InterruptedException =>
 throw e
+  case e: ExecutionException if 
e.getCause.isInstanceOf[AccessControlException] =>
+// We don't have read permissions on the log file
+logDebug(s"Unable to read log $path", e.getCause)
--- End diff --

I would suggest to use warning log for the first time we met such issue, to 
notify user that some event logs cannot be read.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207128637
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -80,8 +80,8 @@ import org.apache.spark.util.kvstore._
  * break. Simple streaming of JSON-formatted events, as is implemented 
today, implicitly
  * maintains this invariant.
  */
-private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
-  extends ApplicationHistoryProvider with Logging {
+private[history] class FsHistoryProvider(conf: SparkConf, protected val 
clock: Clock)
+  extends ApplicationHistoryProvider with LogFilesBlacklisting with 
Logging {
--- End diff --

What is the special advantage of using a mixin trait rather than directly 
changing the code here in `FsHistoryProvider`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r206780805
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +978,42 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+private[history] trait CachedFileSystemHelper extends Logging {
+  protected def fs: FileSystem
+  protected def expireTimeInSeconds: Long
+
+  /**
+   * LRU cache containing the result for the already checked files.
+   */
+  // Visible for testing.
+  private[history] val cache = CacheBuilder.newBuilder()
+.expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS)
+.build[String, java.lang.Boolean]()
--- End diff --

Memory doesn't increase indefinitely as entries expire over the time. 
Moreover, as here we are storing a string  containing only the name of the file 
and a Boolean, each entry is going to need about 100bytes in memory. With 100k 
event logs,this means about 10MB, which doesn't seem to me a value which can 
cause an OOM. Anyway, we can also add a maximum number of entries for this 
cache if you think it is necessary. This would cause some more RPC calls though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r206779479
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +978,42 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+private[history] trait CachedFileSystemHelper extends Logging {
--- End diff --

This is true, but the only way to avoid this issue is to call fs.access 
every time,which may cause huge performance issues. Moreover,I think it is also 
very unlikely that a user manually changes permission of the event logs of an 
application and restarting the SHS in such a scenario would solve the problem. 
In the current state, even though the file is accessible, it is ignored and the 
user has no workaround other than changing ownership or permissions to all 
files,despite the user running SHS can read the files (moreover it is a 
regression for these users)...

Anyway if you have a better suggestion I am more than happy to follow it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-07-31 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r206726059
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +978,42 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+private[history] trait CachedFileSystemHelper extends Logging {
+  protected def fs: FileSystem
+  protected def expireTimeInSeconds: Long
+
+  /**
+   * LRU cache containing the result for the already checked files.
+   */
+  // Visible for testing.
+  private[history] val cache = CacheBuilder.newBuilder()
+.expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS)
+.build[String, java.lang.Boolean]()
--- End diff --

In the real word, there will be many event logs under the folder, this will 
lead to memory increase indefinitely and potentially lead to OOM. We have seen 
that customer has more than 100K event logs in this folder.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-07-31 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r206725814
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +978,42 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+private[history] trait CachedFileSystemHelper extends Logging {
--- End diff --

As discussed offline, my main concern is about cache inconsistency if user 
changed the file permission during cache valid time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-07-28 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r205948923
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +973,38 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+private[history] trait CachedFileSystemHelper extends Logging {
+  protected def fs: FileSystem
+
+  /**
+   * Cache containing the result for the already checked files.
+   */
+  // Visible for testing.
+  private[history] val cache = new mutable.HashMap[String, Boolean]
--- End diff --

For long running history server in busy clusters (particularly where 
`spark.history.fs.cleaner.maxAge` is configured to be low), this Map will cause 
OOM.
Either an LRU cache or a disk backed map with periodic cleanup (based on 
maxAge) might be better ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-07-27 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/spark/pull/21895

[SPARK-24948][SHS] Delegate check access permissions to the file system

## What changes were proposed in this pull request?

In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic 
permissions in order to check wether a user can access a file or not. This is 
not a complete check, as it ignores ACLs and other policies a file system may 
apply in its internal. So this can result in returning wrongly that a user 
cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is 
accessible or not, in order to return the right result.

## How was this patch tested?

modified UTs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/spark SPARK-24948

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21895.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21895


commit 1052c1765e075deaf76f0fedfe805808ce669b5b
Author: Marco Gaido 
Date:   2018-07-27T16:01:15Z

[SPARK-24948][SHS] Delegate check access permissions to the file system




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org