This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49eefc5d1d92 [SPARK-47722][SS] Wait until RocksDB background work 
finish before closing
49eefc5d1d92 is described below

commit 49eefc5d1d9255ca8db624925d813cc29460f4c7
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Thu Apr 4 08:45:27 2024 +0900

    [SPARK-47722][SS] Wait until RocksDB background work finish before closing
    
    ### What changes were proposed in this pull request?
    
    When closing the rocksdb instance, we need to wait until all background 
work finish. If not, the following error could be observed:
    
    ```
    24/03/29 06:47:11 INFO RocksDB StateStoreId(opId=0,partId=0,name=default): 
[NativeRocksDB-2] [/error_handler.cc:396] Background IO error IO error: No such 
file or directory: While open a file for appending: 
/ephemeral/tmp/spark-efd53c17-2b8a-4f80-aca0-b767dc06be3d/StateStoreId(opId=0,partId=0,name=default)-732271d8-03b3-4046-a911-5797804df25c/workingDir-3a85e625-e4fb-4a78-b668-941ca16cc7a2/000008.sst:
 No such file or directory
    24/03/29 06:47:11 ERROR RocksDB StateStoreId(opId=0,partId=0,name=default): 
[NativeRocksDB-3] [/db_impl/db_impl_compaction_flush.cc:3021] Waiting after 
background flush error: IO error: No such file or directory: While open a file 
for appending: 
/ephemeral/tmp/spark-efd53c17-2b8a-4f80-aca0-b767dc06be3d/StateStoreId(opId=0,partId=0,name=default)-732271d8-03b3-4046-a911-5797804df25c/workingDir-3a85e625-e4fb-4a78-b668-941ca16cc7a2/000008.sst:
 No such file or directoryAccumulated backgrou [...]
    <TRUNCATED LOG>
    24/03/29 11:54:09 INFO ShutdownHookManager: Deleting directory 
/ephemeral/tmp/spark-b5dac908-59cc-4276-80f7-34dab79716b7/StateStoreId(opId=0,partId=0,name=default)-702d3c8f-245e-4119-a763-b8e963d07e7b
    24/03/29 06:47:12 INFO ShutdownHookManager: Deleting directory 
/ephemeral/tmp/spark-efd53c17-2b8a-4f80-aca0-b767dc06be3d/StateStoreId(opId=0,partId=4,name=default)-0eb30b1b-b92f-4744-aff6-85f9efd2bcf2
    24/03/29 06:47:12 INFO ShutdownHookManager: Deleting directory 
/ephemeral/tmp/streaming.metadata-d281c16c-89c7-49b3-b65a-6eb2de6ddb6f
    pthread lock: Invalid argument
    ```
    In the source code, after this error is thrown, there is a sleep for 1 
second and then re lock the original mutex:
    
    
https://github.com/facebook/rocksdb/blob/e46ab9d4f0a0e63bfc668421e2994efa918d6570/db/db_impl/db_impl_compaction_flush.cc#L2613
    
    From the logs of RocksDB and ShutdownHookManager , we can see that exactly 
1 second after rocks db throws, the pthread lock: Invalid argument is thrown. 
So it is likely that this mutex throws.
    
    ### Why are the changes needed?
    
    Bug fix for a transient issue
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing test should be enough.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45863 from WweiL/SPARK-47722-rocksdb-cleanup.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala  | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index fcefc1666f3a..c6fb9699cf33 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -886,6 +886,8 @@ class RocksDB(
       colFamilyNameToHandleMap.values.map(handle => handle.close)
       colFamilyNameToHandleMap.clear()
 
+      // Cancel and wait until all background work finishes
+      db.cancelAllBackgroundWork(true)
       // Close the DB instance
       db.close()
       db = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to