This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 08640961e3b [SPARK-45472][SS] RocksDB State Store Doesn't Need to Recheck checkpoint path existence 08640961e3b is described below commit 08640961e3bad7de38ed3358df8706bad028c27a Author: Siying Dong <siying.d...@databricks.com> AuthorDate: Tue Oct 10 11:24:31 2023 +0900 [SPARK-45472][SS] RocksDB State Store Doesn't Need to Recheck checkpoint path existence ### What changes were proposed in this pull request? In RocksDBFileManager, we add a variable to indicate that root path is already checked and created if not existing, so that we don't need to recheck the second time. ### Why are the changes needed? Right now, every time RocksDB.load() is called, we check checkpoint directory existence and create it if not. This is relatively expensive and show up in performance profiling. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing CI tests to cover it. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43299 from siying/rootPath. Authored-by: Siying Dong <siying.d...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../execution/streaming/state/RocksDBFileManager.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index eae9aac3c0a..3d0745c2fb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -146,10 +146,15 @@ class RocksDBFileManager( private def codec = CompressionCodec.createCodec(sparkConf, codecName) + @volatile private var rootDirChecked: Boolean = false + def getChangeLogWriter(version: Long): StateStoreChangelogWriter = { - val rootDir = new Path(dfsRootDir) val changelogFile = dfsChangelogFile(version) - if (!fm.exists(rootDir)) fm.mkdirs(rootDir) + if (!rootDirChecked) { + val rootDir = new Path(dfsRootDir) + if (!fm.exists(rootDir)) fm.mkdirs(rootDir) + rootDirChecked = true + } val changelogWriter = new StateStoreChangelogWriter(fm, changelogFile, codec) changelogWriter } @@ -193,8 +198,11 @@ class RocksDBFileManager( // CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories. // Moreover, once we disable to track the number of keys, in which the numKeys is -1, we // still need to create the initial dfs root directory anyway. - val path = new Path(dfsRootDir) - if (!fm.exists(path)) fm.mkdirs(path) + if (!rootDirChecked) { + val path = new Path(dfsRootDir) + if (!fm.exists(path)) fm.mkdirs(path) + rootDirChecked = true + } } zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version)) logInfo(s"Saved checkpoint file for version $version") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org