This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d67e22826cd [SPARK-41339][SQL] Close and recreate RocksDB write batch 
instead of just clearing
d67e22826cd is described below

commit d67e22826cda41d732e010d73687e74fab60f4b6
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Thu Dec 1 15:50:13 2022 +0900

    [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just 
clearing
    
    ### What changes were proposed in this pull request?
    
    Instead of just calling `writeBatch.clear`, close the write batch and 
recreate it.
    
    ### Why are the changes needed?
    
    A RocksDB `WriteBatch` (and by extension `WriteBatchWithIndex`) stores its 
underlying data in a `std::string`. Why? I'm not sure. But after a partition is 
finished, `writeBatch.clear()` is called (somewhat indirectly through a call to 
`store.abort`), presumably clearing the data in the `WriteBatch`. This calls 
`std::string::clear` followed by `std::string::resize` underneath the hood. 
However, neither of these two things actually reclaims native memory. All the 
memory allocated for ex [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    Fix for excess native memory usage.
    
    ### How was this patch tested?
    
    Existing UTs, not sure how to test for memory usage.
    
    Closes #38853 from Kimahriman/rocksdb-write-batch-close.
    
    Lead-authored-by: Adam Binford <adam...@gmail.com>
    Co-authored-by: centos <centos@adam-dev.novalocal>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/execution/streaming/state/RocksDB.scala       | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 3e1bcbbbf0d..5acd20f49dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -63,7 +63,7 @@ class RocksDB(
   private val readOptions = new ReadOptions()  // used for gets
   private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
   private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
-  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+  private var writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
 
   private val bloomFilter = new BloomFilter()
   private val tableFormatConfig = new BlockBasedTableConfig()
@@ -135,7 +135,7 @@ class RocksDB(
       }
       // reset resources to prevent side-effects from previous loaded version
       closePrefixScanIterators()
-      writeBatch.clear()
+      resetWriteBatch()
       logInfo(s"Loaded $version")
     } catch {
       case t: Throwable =>
@@ -328,7 +328,7 @@ class RocksDB(
    */
   def rollback(): Unit = {
     closePrefixScanIterators()
-    writeBatch.clear()
+    resetWriteBatch()
     numKeysOnWritingVersion = numKeysOnLoadedVersion
     release()
     logInfo(s"Rolled back to $loadedVersion")
@@ -455,6 +455,13 @@ class RocksDB(
     prefixScanReuseIter.clear()
   }
 
+  /** Create a new WriteBatch, clear doesn't deallocate the native memory */
+  private def resetWriteBatch(): Unit = {
+    writeBatch.clear()
+    writeBatch.close()
+    writeBatch = new WriteBatchWithIndex(true)
+  }
+
   private def getDBProperty(property: String): Long = {
     db.getProperty(property).toLong
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to