This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new d5cc890 [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan d5cc890 is described below commit d5cc8909c72e958ce187df9c75847ad0125991ab Author: maryannxue <maryann...@apache.org> AuthorDate: Tue Jan 29 21:33:46 2019 +0900 [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan ## What changes were proposed in this pull request? When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning` or `outputOrdering` is different from the tha [...] The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer. ## How was this patch tested? Added UT. Closes #23678 from maryannxue/spark-26708-2.4. Authored-by: maryannxue <maryann...@apache.org> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../apache/spark/sql/execution/CacheManager.scala | 28 +++++++++++--- .../sql/execution/columnar/InMemoryRelation.scala | 10 +---- .../org/apache/spark/sql/DatasetCacheSuite.scala | 44 +++++++++++++++++++++- 3 files changed, 67 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index c992993..5b30596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -166,16 +166,34 @@ class CacheManager extends Logging { val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] while (it.hasNext) { val cd = it.next() - if (condition(cd.plan)) { - if (clearCache) { - cd.cachedRepresentation.cacheBuilder.clearCache() - } + // If `clearCache` is false (which means the recache request comes from a non-cascading + // cache invalidation) and the cache buffer has already been loaded, we do not need to + // re-compile a physical plan because the old plan will not be used any more by the + // CacheManager although it still lives in compiled `Dataset`s and it could still work. + // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer + // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache + // buffer is still empty, then we could have a more efficient new plan by removing + // dependency on the previously removed cache entries. + // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking + // status test and may not return the most accurate cache buffer state. So the worse case + // scenario can be: + // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we + // will clear the buffer and build a new plan. It is inefficient but doesn't affect + // correctness. + // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we + // will keep it as it is. It means the physical plan has been re-compiled already in the + // other thread. + val buildNewPlan = + clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded + if (condition(cd.plan) && buildNewPlan) { + cd.cachedRepresentation.cacheBuilder.clearCache() // Remove the cache entry before we create a new one, so that we can have a different // physical plan. it.remove() val plan = spark.sessionState.executePlan(cd.plan).executedPlan val newCache = InMemoryRelation( - cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan), + cacheBuilder = cd.cachedRepresentation + .cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null), logicalPlan = cd.plan) needToRecache += cd.copy(cachedRepresentation = newCache) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index b752b77..8eecd7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -74,14 +74,8 @@ case class CachedRDDBuilder( } } - def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = { - new CachedRDDBuilder( - useCompression, - batchSize, - storageLevel, - cachedPlan = cachedPlan, - tableName - )(_cachedColumnBuffers) + def isCachedColumnBuffersLoaded: Boolean = { + _cachedColumnBuffers != null } private def buildBuffers(): RDD[CachedBatch] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 5c6a021..7c97f5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -190,9 +190,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits df1.unpersist(blocking = true) - // df1 un-cached; df2's cache plan re-compiled + // df1 un-cached; df2's cache plan stays the same assert(df1.storageLevel == StorageLevel.NONE) - assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0) + assertCacheDependency(df1.groupBy('a).agg(sum('b))) val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)")) assertCached(df4) @@ -206,4 +206,44 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-26708 Cache data and cached plan should stay consistent") { + val df = spark.range(0, 5).toDF("a") + val df1 = df.withColumn("b", 'a + 1) + val df2 = df.filter('a > 1) + + df.cache() + // Add df1 to the CacheManager; the buffer is currently empty. + df1.cache() + // After calling collect(), df1's buffer has been loaded. + df1.collect() + // Add df2 to the CacheManager; the buffer is currently empty. + df2.cache() + + // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. + assertCacheDependency(df1) + val df1InnerPlan = df1.queryExecution.withCachedData + .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan + // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. + assertCacheDependency(df2) + + df.unpersist(blocking = true) + + // Verify that df1's cache has stayed the same, since df1's cache already has data + // before df.unpersist(). + val df1Limit = df1.limit(2) + val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst { + case i: InMemoryRelation => i.cacheBuilder.cachedPlan + } + assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) + + // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency + // on df, since df2's cache had not been loaded before df.unpersist(). + val df2Limit = df2.limit(2) + val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst { + case i: InMemoryRelation => i.cacheBuilder.cachedPlan + } + assert(df2LimitInnerPlan.isDefined && + df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org