This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a17851c  [SPARK-26548][SQL] Don't hold CacheManager write lock while 
computing executedPlan
a17851c is described below

commit a17851cb95687963936c4d4a7eed132ee2c10677
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Sat Jan 5 19:20:35 2019 -0800

    [SPARK-26548][SQL] Don't hold CacheManager write lock while computing 
executedPlan
    
    ## What changes were proposed in this pull request?
    
    Address SPARK-26548, in Spark 2.4.0, the CacheManager holds a write lock 
while computing the executedPlan for a cached logicalPlan.  In some cases with 
very large query plans this can be an expensive operation, taking minutes to 
run.  The entire cache is blocked during this time.  This PR changes that so 
the writeLock is only obtained after the executedPlan is generated, this 
reduces the time the lock is held to just the necessary time when the shared 
data structure is being updated.
    
    gatorsmile and cloud-fan - You can committed patches in this area before.  
This is a small incremental change.
    
    ## How was this patch tested?
    
    Has been tested on a live system where the blocking was causing major 
issues and it is working well.
     CacheManager has no explicit unit test but is used in many places 
internally as part of the SharedState.
    
    Closes #23469 from DaveDeCaprio/optimizer-unblocked.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../scala/org/apache/spark/sql/execution/CacheManager.scala    | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c992993..728fde5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -88,7 +88,7 @@ class CacheManager extends Logging {
   def cacheQuery(
       query: Dataset[_],
       tableName: Option[String] = None,
-      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
+      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = {
     val planToCache = query.logicalPlan
     if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")
@@ -100,7 +100,13 @@ class CacheManager extends Logging {
         sparkSession.sessionState.executePlan(planToCache).executedPlan,
         tableName,
         planToCache)
-      cachedData.add(CachedData(planToCache, inMemoryRelation))
+      writeLock {
+        if (lookupCachedData(planToCache).nonEmpty) {
+          logWarning("Data has already been cached.")
+        } else {
+          cachedData.add(CachedData(planToCache, inMemoryRelation))
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to