This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a0c029c18b0 [SPARK-44878][SS] Disable strict limit for RocksDB write 
manager to avoid insertion exception on cache full
a0c029c18b0 is described below

commit a0c029c18b0e01cd593cb4dc47c7869d36599592
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Wed Aug 23 13:13:11 2023 +0900

    [SPARK-44878][SS] Disable strict limit for RocksDB write manager to avoid 
insertion exception on cache full
    
    ### What changes were proposed in this pull request?
    Disable strict limit for RocksDB write manager to avoid insertion exception 
on cache full
    
    ### Why are the changes needed?
    In some cases, if the memory limit is reached, on insert/get, we are seeing 
the following exception
    
    ```
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 
in stage 9.0 failed 4 times, most recent failure: Lost task 42.3 in stage 9.0 
(TID 2950) (96.104.176.55 executor 0): org.rocksdb.RocksDBException: Insert 
failed due to LRU cache being full.
            at org.rocksdb.RocksDB.get(Native Method)
            at org.rocksdb.RocksDB.get(RocksDB.java:2053)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDB.get(RocksDB.scala:299)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.get(RocksDBStateStoreProvider.scala:55)
    ```
    
    It seems this is being thrown with strict memory limit within RocksDB here 
- 
https://github.com/facebook/rocksdb/blob/0fa0c97d3e9ac5dfc2e7ae94834b0850cdef5df7/cache/lru_cache.cc#L394
    
    It seems this issue can only happen with the strict mode as described here 
- https://github.com/facebook/rocksdb/issues/5048#issuecomment-470788792
    
    Seems like there is a pending issue for RocksDB around this as well - 
https://github.com/facebook/rocksdb/issues/8670
    
    There is probably a relevant fix, but not sure whether this addresses the 
issue completely - https://github.com/facebook/rocksdb/pull/6619
    (cc - siying )
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    Closes #42567 from anishshri-db/task/SPARK-44878.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/structured-streaming-programming-guide.md                    | 4 ++++
 .../sql/execution/streaming/state/RocksDBMemoryManager.scala      | 8 +++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index dc25adbdfd3..8ec4d620052 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2394,6 +2394,10 @@ If you want to cap RocksDB memory usage in your Spark 
Structured Streaming deplo
 You can also determine the max allowed memory for RocksDB instances by setting 
the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static 
number or as a fraction of the physical memory available on the node.
 Limits for individual RocksDB instances can also be configured by setting 
`spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and 
`spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required 
values. By default, RocksDB internal defaults are used for these settings.
 
+Note that the `boundedMemoryUsage` config will enable a soft limit on the 
total memory usage for RocksDB.
+So the total memory used by RocksDB can temporarily exceed this value if all 
blocks allocated to higher level readers are in use.
+Enabling a strict limit is not possible at this time since it will cause query 
failures and we do not support re-balancing of the state across additional 
nodes.
+
 ##### RocksDB State Store Changelog Checkpointing
 In newer version of Spark, changelog checkpointing is introduced for RocksDB 
state store. The traditional checkpointing mechanism for RocksDB State Store is 
incremental snapshot checkpointing, where the manifest files and newly 
generated RocksDB SST files of RocksDB instances are uploaded to a durable 
storage.
 Instead of uploading data files of RocksDB instances, changelog checkpointing 
uploads changes made to the state since the last checkpoint for durability.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
index 4766dcd0b0c..38b9dc56838 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
@@ -50,7 +50,13 @@ object RocksDBMemoryManager extends Logging {
         logInfo(s"Creating RocksDB state store LRU cache with " +
           s"total_size=$totalMemoryUsageInBytes")
 
-        cache = new LRUCache(totalMemoryUsageInBytes, -1, true, 
conf.highPriorityPoolRatio)
+        // SPARK-44878 - avoid using strict limit to prevent insertion 
exception on cache full.
+        // Please refer to RocksDB issue here - 
https://github.com/facebook/rocksdb/issues/8670
+        cache = new LRUCache(totalMemoryUsageInBytes,
+          -1,
+          /* strictCapacityLimit = */false,
+          conf.highPriorityPoolRatio)
+
         writeBufferManager = new WriteBufferManager(
           (totalMemoryUsageInBytes * conf.writeBufferCacheRatio).toLong,
           cache)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to