This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 960375f  [SPARK-28157][CORE][2.3] Make SHS clear KVStore `LogInfo`s 
for the blacklisted entries
960375f is described below

commit 960375f6e9ec3a151af208c0ebc354408c92534c
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Jun 27 16:58:39 2019 +0000

    [SPARK-28157][CORE][2.3] Make SHS clear KVStore `LogInfo`s for the 
blacklisted entries
    
    ## What changes were proposed in this pull request?
    
    At Spark 2.4.0/2.3.2/2.2.3, 
[SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated 
access permission checks to the file system, and maintains a blacklist for all 
event log files failed once at reading. The blacklisted log files are released 
back after `CLEAN_INTERVAL_S` seconds.
    
    However, the released files whose sizes don't changes are ignored forever 
due to `info.fileSize < entry.getLen()` condition (previously 
[here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454)
 and now at 
[shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571))
 which returns `false` always when the size is the same with the exi [...]
    
    This PR aims to remove the existing entry from `KVStore` when it goes to 
the blacklist.
    
    ## How was this patch tested?
    
    Pass the Jenkins with the updated test case.
    
    Closes #24977 from dongjoon-hyun/SPARK-28157-2.3.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: DB Tsai <d_t...@apple.com>
---
 .../apache/spark/deploy/history/FsHistoryProvider.scala    |  3 +++
 .../spark/deploy/history/FsHistoryProviderSuite.scala      | 14 +++++++++-----
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 23572eb..98b64a5 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -497,6 +497,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
             // We don't have read permissions on the log file
             logWarning(s"Unable to read log $path", e.getCause)
             blacklist(path)
+            // SPARK-28157 We should remove this blacklisted entry from the 
KVStore
+            // to handle permission-only changes with the same file sizes 
later.
+            listing.delete(classOf[LogInfo], path.toString)
           case e: Exception =>
             logError("Exception while merging application listings", e)
         } finally {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index bf2b044..24daad2 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -786,11 +786,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     writeFile(accessGranted, true, None,
       SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 
1L, "test", None),
       SparkListenerApplicationEnd(5L))
+    var isReadable = false
     val mockedFs = spy(provider.fs)
     doThrow(new AccessControlException("Cannot read accessDenied 
file")).when(mockedFs).open(
       argThat(new ArgumentMatcher[Path]() {
         override def matches(path: Any): Boolean = {
-          path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
+          path.asInstanceOf[Path].getName.toLowerCase == "accessdenied" && 
!isReadable
         }
       }))
     val mockedProvider = spy(provider)
@@ -798,9 +799,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     updateAndCheck(mockedProvider) { list =>
       list.size should be(1)
     }
-    writeFile(accessDenied, true, None,
-      SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, 
"test", None),
-      SparkListenerApplicationEnd(5L))
     // Doing 2 times in order to check the blacklist filter too
     updateAndCheck(mockedProvider) { list =>
       list.size should be(1)
@@ -808,8 +806,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     val accessDeniedPath = new Path(accessDenied.getPath)
     assert(mockedProvider.isBlacklisted(accessDeniedPath))
     clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
+    isReadable = true
     mockedProvider.cleanLogs()
-    assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+    updateAndCheck(mockedProvider) { list =>
+      assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+      assert(list.exists(_.name == "accessDenied"))
+      assert(list.exists(_.name == "accessGranted"))
+      list.size should be(2)
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to