This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 86237bba262 [SPARK-44504][SS] Unload provider thereby forcing DB 
instance close and releasing resources on maintenance task error
86237bba262 is described below

commit 86237bba2625ad0cf5325c85e342e6230d7a0699
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Fri Jul 21 13:46:01 2023 +0900

    [SPARK-44504][SS] Unload provider thereby forcing DB instance close and 
releasing resources on maintenance task error
    
    ### What changes were proposed in this pull request?
    Unload provider thereby forcing DB instance close and releasing resources 
on maintenance task error
    
    ### Why are the changes needed?
    If we don't do the close, the DB instance and corresponding resources 
(memory, file descriptors etc) are always left open and the pointer to these 
objects is lost since loadedProviders is cleared.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ```
    ), ForkJoinPool.commonPool-worker-5 (daemon=true), 
ForkJoinPool.commonPool-worker-17 (daemon=true), shuffle-boss-6-1 
(daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), 
ForkJoinPool.commonPool-worker-31 (daemon=true), 
ForkJoinPool.commonPool-worker-23 (daemon=true), state-store-maintenance-task 
(daemon=true), ForkJoinPool.commonPool-worker-9 (daemon=true) =====
    [info] Run completed in 2 minutes, 49 seconds.
    [info] Total number of tests run: 32
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 32, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #42098 from anishshri-db/task/SPARK-44504.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala      | 4 ++++
 .../org/apache/spark/sql/execution/streaming/state/StateStore.scala   | 3 +++
 2 files changed, 7 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 65299ea37ef..386df61a9e0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -462,6 +462,8 @@ class RocksDB(
   /** Release all resources */
   def close(): Unit = {
     try {
+      // Acquire DB instance lock and release at the end to allow for 
synchronized access
+      acquire()
       closeDB()
 
       readOptions.close()
@@ -476,6 +478,8 @@ class RocksDB(
     } catch {
       case e: Exception =>
         logWarning("Error closing RocksDB", e)
+    } finally {
+      release()
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 96c7b61f205..8a09b226a0c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -611,6 +611,9 @@ object StateStore extends Logging {
           onError = { loadedProviders.synchronized {
               logInfo("Stopping maintenance task since an error was 
encountered.")
               stopMaintenanceTask()
+              // SPARK-44504 - Unload explicitly to force closing underlying 
DB instance
+              // and releasing allocated resources, especially for 
RocksDBStateStoreProvider.
+              loadedProviders.keySet.foreach { key => unload(key) }
               loadedProviders.clear()
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to