This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 86237bba262 [SPARK-44504][SS] Unload provider thereby forcing DB instance close and releasing resources on maintenance task error 86237bba262 is described below commit 86237bba2625ad0cf5325c85e342e6230d7a0699 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Fri Jul 21 13:46:01 2023 +0900 [SPARK-44504][SS] Unload provider thereby forcing DB instance close and releasing resources on maintenance task error ### What changes were proposed in this pull request? Unload provider thereby forcing DB instance close and releasing resources on maintenance task error ### Why are the changes needed? If we don't do the close, the DB instance and corresponding resources (memory, file descriptors etc) are always left open and the pointer to these objects is lost since loadedProviders is cleared. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` ), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-17 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), ForkJoinPool.commonPool-worker-31 (daemon=true), ForkJoinPool.commonPool-worker-23 (daemon=true), state-store-maintenance-task (daemon=true), ForkJoinPool.commonPool-worker-9 (daemon=true) ===== [info] Run completed in 2 minutes, 49 seconds. [info] Total number of tests run: 32 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 32, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #42098 from anishshri-db/task/SPARK-44504. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 4 ++++ .../org/apache/spark/sql/execution/streaming/state/StateStore.scala | 3 +++ 2 files changed, 7 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 65299ea37ef..386df61a9e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -462,6 +462,8 @@ class RocksDB( /** Release all resources */ def close(): Unit = { try { + // Acquire DB instance lock and release at the end to allow for synchronized access + acquire() closeDB() readOptions.close() @@ -476,6 +478,8 @@ class RocksDB( } catch { case e: Exception => logWarning("Error closing RocksDB", e) + } finally { + release() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 96c7b61f205..8a09b226a0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -611,6 +611,9 @@ object StateStore extends Logging { onError = { loadedProviders.synchronized { logInfo("Stopping maintenance task since an error was encountered.") stopMaintenanceTask() + // SPARK-44504 - Unload explicitly to force closing underlying DB instance + // and releasing allocated resources, especially for RocksDBStateStoreProvider. + loadedProviders.keySet.foreach { key => unload(key) } loadedProviders.clear() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org