This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7eeca02 [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted entries 7eeca02 is described below commit 7eeca029404c8cc1e2c3e7ae8728b90582e25d76 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Wed Jun 26 18:56:06 2019 +0000 [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted entries ## What changes were proposed in this pull request? At Spark 2.4.0/2.3.2/2.2.3, [SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated access permission checks to the file system, and maintains a blacklist for all event log files failed once at reading. The blacklisted log files are released back after `CLEAN_INTERVAL_S` seconds. However, the released files whose sizes don't changes are ignored forever due to `info.fileSize < entry.getLen()` condition (previously [here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454) and now at [shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571)) which returns `false` always when the size is the same with the exi [...] This PR aims to remove the existing entry from `KVStore` when it goes to the blacklist. ## How was this patch tested? Pass the Jenkins with the updated test case. Closes #24966 from dongjoon-hyun/SPARK-28157. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: DB Tsai <d_t...@apple.com> --- .../apache/spark/deploy/history/FsHistoryProvider.scala | 3 +++ .../spark/deploy/history/FsHistoryProviderSuite.scala | 15 ++++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 98265ff..f2ee599 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -536,6 +536,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // We don't have read permissions on the log file logWarning(s"Unable to read log $path", e.getCause) blacklist(path) + // SPARK-28157 We should remove this blacklisted entry from the KVStore + // to handle permission-only changes with the same file sizes later. + listing.delete(classOf[LogInfo], path.toString) case e: Exception => logError("Exception while merging application listings", e) } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 791814b..571c6e3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1122,17 +1122,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { writeFile(accessGranted, true, None, SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None), SparkListenerApplicationEnd(5L)) + var isReadable = false val mockedFs = spy(provider.fs) doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open( - argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied")) + argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied" && + !isReadable)) val mockedProvider = spy(provider) when(mockedProvider.fs).thenReturn(mockedFs) updateAndCheck(mockedProvider) { list => list.size should be(1) } - writeFile(accessDenied, true, None, - SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None), - SparkListenerApplicationEnd(5L)) // Doing 2 times in order to check the blacklist filter too updateAndCheck(mockedProvider) { list => list.size should be(1) @@ -1140,8 +1139,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val accessDeniedPath = new Path(accessDenied.getPath) assert(mockedProvider.isBlacklisted(accessDeniedPath)) clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d + isReadable = true mockedProvider.cleanLogs() - assert(!mockedProvider.isBlacklisted(accessDeniedPath)) + updateAndCheck(mockedProvider) { list => + assert(!mockedProvider.isBlacklisted(accessDeniedPath)) + assert(list.exists(_.name == "accessDenied")) + assert(list.exists(_.name == "accessGranted")) + list.size should be(2) + } } test("check in-progress event logs absolute length") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org