This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a0c029c18b0 [SPARK-44878][SS] Disable strict limit for RocksDB write manager to avoid insertion exception on cache full a0c029c18b0 is described below commit a0c029c18b0e01cd593cb4dc47c7869d36599592 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Wed Aug 23 13:13:11 2023 +0900 [SPARK-44878][SS] Disable strict limit for RocksDB write manager to avoid insertion exception on cache full ### What changes were proposed in this pull request? Disable strict limit for RocksDB write manager to avoid insertion exception on cache full ### Why are the changes needed? In some cases, if the memory limit is reached, on insert/get, we are seeing the following exception ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 9.0 failed 4 times, most recent failure: Lost task 42.3 in stage 9.0 (TID 2950) (96.104.176.55 executor 0): org.rocksdb.RocksDBException: Insert failed due to LRU cache being full. at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:2053) at org.apache.spark.sql.execution.streaming.state.RocksDB.get(RocksDB.scala:299) at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.get(RocksDBStateStoreProvider.scala:55) ``` It seems this is being thrown with strict memory limit within RocksDB here - https://github.com/facebook/rocksdb/blob/0fa0c97d3e9ac5dfc2e7ae94834b0850cdef5df7/cache/lru_cache.cc#L394 It seems this issue can only happen with the strict mode as described here - https://github.com/facebook/rocksdb/issues/5048#issuecomment-470788792 Seems like there is a pending issue for RocksDB around this as well - https://github.com/facebook/rocksdb/issues/8670 There is probably a relevant fix, but not sure whether this addresses the issue completely - https://github.com/facebook/rocksdb/pull/6619 (cc - siying ) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #42567 from anishshri-db/task/SPARK-44878. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- docs/structured-streaming-programming-guide.md | 4 ++++ .../sql/execution/streaming/state/RocksDBMemoryManager.scala | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index dc25adbdfd3..8ec4d620052 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2394,6 +2394,10 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node. Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings. +Note that the `boundedMemoryUsage` config will enable a soft limit on the total memory usage for RocksDB. +So the total memory used by RocksDB can temporarily exceed this value if all blocks allocated to higher level readers are in use. +Enabling a strict limit is not possible at this time since it will cause query failures and we do not support re-balancing of the state across additional nodes. + ##### RocksDB State Store Changelog Checkpointing In newer version of Spark, changelog checkpointing is introduced for RocksDB state store. The traditional checkpointing mechanism for RocksDB State Store is incremental snapshot checkpointing, where the manifest files and newly generated RocksDB SST files of RocksDB instances are uploaded to a durable storage. Instead of uploading data files of RocksDB instances, changelog checkpointing uploads changes made to the state since the last checkpoint for durability. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index 4766dcd0b0c..38b9dc56838 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -50,7 +50,13 @@ object RocksDBMemoryManager extends Logging { logInfo(s"Creating RocksDB state store LRU cache with " + s"total_size=$totalMemoryUsageInBytes") - cache = new LRUCache(totalMemoryUsageInBytes, -1, true, conf.highPriorityPoolRatio) + // SPARK-44878 - avoid using strict limit to prevent insertion exception on cache full. + // Please refer to RocksDB issue here - https://github.com/facebook/rocksdb/issues/8670 + cache = new LRUCache(totalMemoryUsageInBytes, + -1, + /* strictCapacityLimit = */false, + conf.highPriorityPoolRatio) + writeBufferManager = new WriteBufferManager( (totalMemoryUsageInBytes * conf.writeBufferCacheRatio).toLong, cache) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org