This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eeca02  [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the 
blacklisted entries
7eeca02 is described below

commit 7eeca029404c8cc1e2c3e7ae8728b90582e25d76
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Wed Jun 26 18:56:06 2019 +0000

    [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted 
entries
    
    ## What changes were proposed in this pull request?
    
    At Spark 2.4.0/2.3.2/2.2.3, 
[SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated 
access permission checks to the file system, and maintains a blacklist for all 
event log files failed once at reading. The blacklisted log files are released 
back after `CLEAN_INTERVAL_S` seconds.
    
    However, the released files whose sizes don't changes are ignored forever 
due to `info.fileSize < entry.getLen()` condition (previously 
[here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454)
 and now at 
[shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571))
 which returns `false` always when the size is the same with the exi [...]
    
    This PR aims to remove the existing entry from `KVStore` when it goes to 
the blacklist.
    
    ## How was this patch tested?
    
    Pass the Jenkins with the updated test case.
    
    Closes #24966 from dongjoon-hyun/SPARK-28157.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: DB Tsai <d_t...@apple.com>
---
 .../apache/spark/deploy/history/FsHistoryProvider.scala   |  3 +++
 .../spark/deploy/history/FsHistoryProviderSuite.scala     | 15 ++++++++++-----
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 98265ff..f2ee599 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -536,6 +536,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
             // We don't have read permissions on the log file
             logWarning(s"Unable to read log $path", e.getCause)
             blacklist(path)
+            // SPARK-28157 We should remove this blacklisted entry from the 
KVStore
+            // to handle permission-only changes with the same file sizes 
later.
+            listing.delete(classOf[LogInfo], path.toString)
           case e: Exception =>
             logError("Exception while merging application listings", e)
         } finally {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 791814b..571c6e3 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1122,17 +1122,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
     writeFile(accessGranted, true, None,
       SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 
1L, "test", None),
       SparkListenerApplicationEnd(5L))
+    var isReadable = false
     val mockedFs = spy(provider.fs)
     doThrow(new AccessControlException("Cannot read accessDenied 
file")).when(mockedFs).open(
-      argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == 
"accessdenied"))
+      argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == 
"accessdenied" &&
+        !isReadable))
     val mockedProvider = spy(provider)
     when(mockedProvider.fs).thenReturn(mockedFs)
     updateAndCheck(mockedProvider) { list =>
       list.size should be(1)
     }
-    writeFile(accessDenied, true, None,
-      SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, 
"test", None),
-      SparkListenerApplicationEnd(5L))
     // Doing 2 times in order to check the blacklist filter too
     updateAndCheck(mockedProvider) { list =>
       list.size should be(1)
@@ -1140,8 +1139,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
     val accessDeniedPath = new Path(accessDenied.getPath)
     assert(mockedProvider.isBlacklisted(accessDeniedPath))
     clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
+    isReadable = true
     mockedProvider.cleanLogs()
-    assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+    updateAndCheck(mockedProvider) { list =>
+      assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+      assert(list.exists(_.name == "accessDenied"))
+      assert(list.exists(_.name == "accessGranted"))
+      list.size should be(2)
+    }
   }
 
   test("check in-progress event logs absolute length") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to