This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d67e22826cd [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing d67e22826cd is described below commit d67e22826cda41d732e010d73687e74fab60f4b6 Author: Adam Binford <adam...@gmail.com> AuthorDate: Thu Dec 1 15:50:13 2022 +0900 [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing ### What changes were proposed in this pull request? Instead of just calling `writeBatch.clear`, close the write batch and recreate it. ### Why are the changes needed? A RocksDB `WriteBatch` (and by extension `WriteBatchWithIndex`) stores its underlying data in a `std::string`. Why? I'm not sure. But after a partition is finished, `writeBatch.clear()` is called (somewhat indirectly through a call to `store.abort`), presumably clearing the data in the `WriteBatch`. This calls `std::string::clear` followed by `std::string::resize` underneath the hood. However, neither of these two things actually reclaims native memory. All the memory allocated for ex [...] ### Does this PR introduce _any_ user-facing change? Fix for excess native memory usage. ### How was this patch tested? Existing UTs, not sure how to test for memory usage. Closes #38853 from Kimahriman/rocksdb-write-batch-close. Lead-authored-by: Adam Binford <adam...@gmail.com> Co-authored-by: centos <centos@adam-dev.novalocal> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/execution/streaming/state/RocksDB.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 3e1bcbbbf0d..5acd20f49dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -63,7 +63,7 @@ class RocksDB( private val readOptions = new ReadOptions() // used for gets private val writeOptions = new WriteOptions().setSync(true) // wait for batched write to complete private val flushOptions = new FlushOptions().setWaitForFlush(true) // wait for flush to complete - private val writeBatch = new WriteBatchWithIndex(true) // overwrite multiple updates to a key + private var writeBatch = new WriteBatchWithIndex(true) // overwrite multiple updates to a key private val bloomFilter = new BloomFilter() private val tableFormatConfig = new BlockBasedTableConfig() @@ -135,7 +135,7 @@ class RocksDB( } // reset resources to prevent side-effects from previous loaded version closePrefixScanIterators() - writeBatch.clear() + resetWriteBatch() logInfo(s"Loaded $version") } catch { case t: Throwable => @@ -328,7 +328,7 @@ class RocksDB( */ def rollback(): Unit = { closePrefixScanIterators() - writeBatch.clear() + resetWriteBatch() numKeysOnWritingVersion = numKeysOnLoadedVersion release() logInfo(s"Rolled back to $loadedVersion") @@ -455,6 +455,13 @@ class RocksDB( prefixScanReuseIter.clear() } + /** Create a new WriteBatch, clear doesn't deallocate the native memory */ + private def resetWriteBatch(): Unit = { + writeBatch.clear() + writeBatch.close() + writeBatch = new WriteBatchWithIndex(true) + } + private def getDBProperty(property: String): Long = { db.getProperty(property).toLong } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org