Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
HeartSaVioR closed pull request #50612: [SPARK-51823][SS] Add config to not persist state store on executors URL: https://github.com/apache/spark/pull/50612 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
HeartSaVioR commented on PR #50612: URL: https://github.com/apache/spark/pull/50612#issuecomment-2833674696 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2051353142 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala: ## @@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest } } + testWithColumnFamilies("SPARK-51823: unload state stores on commit", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +withTempDir { dir => + withSQLConf( +(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName), +(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath), +(SQLConf.SHUFFLE_PARTITIONS.key -> "1"), +(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) { +val inputData = MemoryStream[Int] + +val query = inputData.toDS().toDF("value") + .select($"value") + .groupBy($"value") + .agg(count("*")) + .writeStream + .format("console") + .outputMode("complete") + .start() +try { + inputData.addData(1, 2) + inputData.addData(2, 3) + query.processAllAvailable() + + // StateStore should be unloaded, so its tmp dir shouldn't exist + for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) { +assert(!file.getName().startsWith("StateStore")) Review Comment: Still doesn't work, too many other suites not cleaning up after themselves, need to think about how else to verify -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
anishshri-db commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2049642043 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala: ## @@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { } } + test("SPARK-X: unload on commit") { Review Comment: Can we add the actual SPARK ticket number here ? ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2519,6 +2519,16 @@ object SQLConf { .stringConf .createWithDefault(CompressionCodec.LZ4) + val STATE_STORE_UNLOAD_ON_COMMIT = +buildConf("spark.sql.streaming.stateStore.unloadOnCommit") + .internal() + .doc("When true, Spark will synchronously run maintenance and then close each StateStore " + +"instance on task completion. This reduce overhead involved in keeping every StateStore " + Review Comment: nit: `reduces overhead in` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2050810199 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala: ## @@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest } } + testWithColumnFamilies("SPARK-51823: unload state stores on commit", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +withTempDir { dir => + withSQLConf( +(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName), +(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath), +(SQLConf.SHUFFLE_PARTITIONS.key -> "1"), +(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) { +val inputData = MemoryStream[Int] + +val query = inputData.toDS().toDF("value") + .select($"value") + .groupBy($"value") + .agg(count("*")) + .writeStream + .format("console") + .outputMode("complete") + .start() +try { + inputData.addData(1, 2) + inputData.addData(2, 3) + query.processAllAvailable() + + // StateStore should be unloaded, so its tmp dir shouldn't exist + for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) { +assert(!file.getName().startsWith("StateStore")) Review Comment: Fixed by adding an `afterEach` to clear the state store. The parent `StreamTest` only does an `afterAll` to clear the state store, not sure if that one should just be updated to `afterEach` instead ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala: ## @@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest } } + testWithColumnFamilies("SPARK-51823: unload state stores on commit", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +withTempDir { dir => + withSQLConf( +(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName), +(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath), +(SQLConf.SHUFFLE_PARTITIONS.key -> "1"), +(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) { +val inputData = MemoryStream[Int] + +val query = inputData.toDS().toDF("value") + .select($"value") + .groupBy($"value") + .agg(count("*")) + .writeStream + .format("console") + .outputMode("complete") + .start() +try { + inputData.addData(1, 2) + inputData.addData(2, 3) + query.processAllAvailable() + + // StateStore should be unloaded, so its tmp dir shouldn't exist + for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) { +assert(!file.getName().startsWith("StateStore")) Review Comment: Fixed by adding an `afterEach` to clear the state store in this suite. The parent `StreamTest` only does an `afterAll` to clear the state store, not sure if that one should just be updated to `afterEach` instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2050746015 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala: ## @@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest } } + testWithColumnFamilies("SPARK-51823: unload state stores on commit", +TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => +withTempDir { dir => + withSQLConf( +(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName), +(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath), +(SQLConf.SHUFFLE_PARTITIONS.key -> "1"), +(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) { +val inputData = MemoryStream[Int] + +val query = inputData.toDS().toDF("value") + .select($"value") + .groupBy($"value") + .agg(count("*")) + .writeStream + .format("console") + .outputMode("complete") + .start() +try { + inputData.addData(1, 2) + inputData.addData(2, 3) + query.processAllAvailable() + + // StateStore should be unloaded, so its tmp dir shouldn't exist + for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) { +assert(!file.getName().startsWith("StateStore")) Review Comment: Oof this only works if I run this test by itself. When running the whole Suite, all the other tests leave StateStore data on disk so this check sees those -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2050515653 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala: ## @@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { } } + test("SPARK-51823: unload on commit") { Review Comment: Added a basic integration test, let me know if there's anything you want to add to it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on PR #50612: URL: https://github.com/apache/spark/pull/50612#issuecomment-2813777136 > @Kimahriman - yea and with 4.0, the only additional cost for `doMaintenance` would be around uploading the snapshot and deleting old versions. Both likely won't be expensive on a per batch basis (mostly a no-op for most batches) but you could see some spikes in latency when both these operations are performed. (I guess thats acceptable in your case rather than keeping the resources running) Yeah I thought about trying to use the background maintenance to clean up the state but that just seemed hacky and race condition prone, and I'm already saying I care less about latency in this mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on PR #50612: URL: https://github.com/apache/spark/pull/50612#issuecomment-2811589454 > @Kimahriman Thanks for submitting this change. I just took a quick look. Please can you share more on the motivation for this and your use case? I would like to understand the issue you observed, the type of stateful query you ran, the state store provider you used and your cluster setup. There's a little more information in [the jira issue](https://issues.apache.org/jira/browse/SPARK-51823). The quick background is we do relatively large streaming deduplications and streaming aggregations (total state size can be in the 10s to 100s of TiB) with up to 10s of thousands of partitions. We've been dealing with issues related to this for a long time, and over time some fixes come out to make this situation better, but at the end of the day they are mostly band-aids to this type of scenario. We use the RocksDB state store for most things, and use bounded memory to limit resource utilization. This is the result of finally digging into why some of our partitions were taking over an hour to create a RocksDB snapshot to upload. This led us to find a lot of things potentially contributing to this: - The level-0 cache is pinned for all opened RocksDB instances on an executor. This can easily be several hundred on a single executor, and all that memory can't be freed even when those instances are not being used. This could be fixed by not pinning the level-0 cache - There seemed to be contention for background compaction, as we would see the checkpoint process start, and then nothing happen for that partition for an hour, and then finally compaction kick in and the checkpoint successfully created. This could be improved by increasing background threads But at the end of the day these are all workarounds to the problem that the existing stateful streaming approach doesn't work well with high-latency, high-volume queries, it's more designed around low-latency, low-volume queries. Additionally, we use a dynamic allocation setup, so it is very likely most of our executors will be deallocated before the next batch runs, so keeping the state stores open does nothing but waste resources. This change would also help the HDFSBackedStateStore have more use cases again and help some people avoid the added complexity of using RocksDB just to deal with all the state stores being kept on a small number of executors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
anishshri-db commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2050005386 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala: ## @@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { } } + test("SPARK-51823: unload on commit") { Review Comment: Should we add an integration test under RocksDBStateStoreIntegrationSuite with the config enabled ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2049770733 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala: ## @@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter { } } + test("SPARK-X: unload on commit") { Review Comment: Whoops yeah fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2049770928 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2519,6 +2519,16 @@ object SQLConf { .stringConf .createWithDefault(CompressionCodec.LZ4) + val STATE_STORE_UNLOAD_ON_COMMIT = +buildConf("spark.sql.streaming.stateStore.unloadOnCommit") + .internal() + .doc("When true, Spark will synchronously run maintenance and then close each StateStore " + +"instance on task completion. This reduce overhead involved in keeping every StateStore " + Review Comment: Updated a little more with the doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
anishshri-db commented on PR #50612: URL: https://github.com/apache/spark/pull/50612#issuecomment-2813774552 @Kimahriman - yea and with 4.0, the only additional cost for `doMaintenance` would be around uploading the snapshot and deleting old versions. Both likely won't be expensive on a per batch basis (mostly a no-op for most batches) but you could see some spikes in latency when both these operations are performed. (I guess thats acceptable in your case rather than keeping the resources running) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
Kimahriman commented on PR #50612: URL: https://github.com/apache/spark/pull/50612#issuecomment-2813756089 > @Kimahriman - if you are removing/adding executors per batch, then locality probably is not very useful. Yeah this includes not even reporting to the coordinator being active since that's just used for locality. > But I'm curious about the perf diff you see with large state (especially as the large state grows) - I guess it might not matter a whole lot - because even today - you are doing a fresh pull for each batch ? Yeah generally for us there's no performance drop since many of our executors will end up get deallocated between batches anyway, so we have to redownload the state each batch regardless. The long pole in the tent for us is generally the time it takes to create and upload a checkpoint. This is partially due to issues where a checkpoint is generally created every batch for RocksDB even with the changelog enabled, because of the hard coded 10k row check as well as not initializing the latest snapshot version on a fresh load (both of which appear to be fixed for the 4.0 release) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]
anishshri-db commented on PR #50612: URL: https://github.com/apache/spark/pull/50612#issuecomment-2813739210 @Kimahriman - if you are removing/adding executors per batch, then locality probably is not very useful. But I'm curious about the perf diff you see with large state (especially as the large state grows) - I guess it might not matter a whole lot - because even today - you are doing a fresh pull for each batch ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org