Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-27 Thread via GitHub


HeartSaVioR closed pull request #50612: [SPARK-51823][SS] Add config to not 
persist state store on executors
URL: https://github.com/apache/spark/pull/50612


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-27 Thread via GitHub


HeartSaVioR commented on PR #50612:
URL: https://github.com/apache/spark/pull/50612#issuecomment-2833674696

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-18 Thread via GitHub


Kimahriman commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2051353142


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##
@@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
 }
   }
 
+  testWithColumnFamilies("SPARK-51823: unload state stores on commit",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+withTempDir { dir =>
+  withSQLConf(
+(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName),
+(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
+(SQLConf.SHUFFLE_PARTITIONS.key -> "1"),
+(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) {
+val inputData = MemoryStream[Int]
+
+val query = inputData.toDS().toDF("value")
+  .select($"value")
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .format("console")
+  .outputMode("complete")
+  .start()
+try {
+  inputData.addData(1, 2)
+  inputData.addData(2, 3)
+  query.processAllAvailable()
+
+  // StateStore should be unloaded, so its tmp dir shouldn't exist
+  for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
+assert(!file.getName().startsWith("StateStore"))

Review Comment:
   Still doesn't work, too many other suites not cleaning up after themselves, 
need to think about how else to verify



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-18 Thread via GitHub


anishshri-db commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2049642043


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala:
##
@@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
 }
   }
 
+  test("SPARK-X: unload on commit") {

Review Comment:
   Can we add the actual SPARK ticket number here ?



##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2519,6 +2519,16 @@ object SQLConf {
   .stringConf
   .createWithDefault(CompressionCodec.LZ4)
 
+  val STATE_STORE_UNLOAD_ON_COMMIT =
+buildConf("spark.sql.streaming.stateStore.unloadOnCommit")
+  .internal()
+  .doc("When true, Spark will synchronously run maintenance and then close 
each StateStore " +
+"instance on task completion. This reduce overhead involved in keeping 
every StateStore " +

Review Comment:
   nit: `reduces overhead in`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-18 Thread via GitHub


Kimahriman commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2050810199


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##
@@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
 }
   }
 
+  testWithColumnFamilies("SPARK-51823: unload state stores on commit",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+withTempDir { dir =>
+  withSQLConf(
+(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName),
+(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
+(SQLConf.SHUFFLE_PARTITIONS.key -> "1"),
+(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) {
+val inputData = MemoryStream[Int]
+
+val query = inputData.toDS().toDF("value")
+  .select($"value")
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .format("console")
+  .outputMode("complete")
+  .start()
+try {
+  inputData.addData(1, 2)
+  inputData.addData(2, 3)
+  query.processAllAvailable()
+
+  // StateStore should be unloaded, so its tmp dir shouldn't exist
+  for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
+assert(!file.getName().startsWith("StateStore"))

Review Comment:
   Fixed by adding an `afterEach` to clear the state store. The parent 
`StreamTest` only does an `afterAll` to clear the state store, not sure if that 
one should just be updated to `afterEach` instead



##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##
@@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
 }
   }
 
+  testWithColumnFamilies("SPARK-51823: unload state stores on commit",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+withTempDir { dir =>
+  withSQLConf(
+(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName),
+(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
+(SQLConf.SHUFFLE_PARTITIONS.key -> "1"),
+(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) {
+val inputData = MemoryStream[Int]
+
+val query = inputData.toDS().toDF("value")
+  .select($"value")
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .format("console")
+  .outputMode("complete")
+  .start()
+try {
+  inputData.addData(1, 2)
+  inputData.addData(2, 3)
+  query.processAllAvailable()
+
+  // StateStore should be unloaded, so its tmp dir shouldn't exist
+  for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
+assert(!file.getName().startsWith("StateStore"))

Review Comment:
   Fixed by adding an `afterEach` to clear the state store in this suite. The 
parent `StreamTest` only does an `afterAll` to clear the state store, not sure 
if that one should just be updated to `afterEach` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-18 Thread via GitHub


Kimahriman commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2050746015


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##
@@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
 }
   }
 
+  testWithColumnFamilies("SPARK-51823: unload state stores on commit",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+withTempDir { dir =>
+  withSQLConf(
+(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName),
+(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
+(SQLConf.SHUFFLE_PARTITIONS.key -> "1"),
+(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) {
+val inputData = MemoryStream[Int]
+
+val query = inputData.toDS().toDF("value")
+  .select($"value")
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .format("console")
+  .outputMode("complete")
+  .start()
+try {
+  inputData.addData(1, 2)
+  inputData.addData(2, 3)
+  query.processAllAvailable()
+
+  // StateStore should be unloaded, so its tmp dir shouldn't exist
+  for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
+assert(!file.getName().startsWith("StateStore"))

Review Comment:
   Oof this only works if I run this test by itself. When running the whole 
Suite, all the other tests leave StateStore data on disk so this check sees 
those



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-18 Thread via GitHub


Kimahriman commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2050515653


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala:
##
@@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
 }
   }
 
+  test("SPARK-51823: unload on commit") {

Review Comment:
   Added a basic integration test, let me know if there's anything you want to 
add to it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-18 Thread via GitHub


Kimahriman commented on PR #50612:
URL: https://github.com/apache/spark/pull/50612#issuecomment-2813777136

   > @Kimahriman - yea and with 4.0, the only additional cost for 
`doMaintenance` would be around uploading the snapshot and deleting old 
versions. Both likely won't be expensive on a per batch basis (mostly a no-op 
for most batches) but you could see some spikes in latency when both these 
operations are performed. (I guess thats acceptable in your case rather than 
keeping the resources running)
   
   Yeah I thought about trying to use the background maintenance to clean up 
the state but that just seemed hacky and race condition prone, and I'm already 
saying I care less about latency in this mode


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-18 Thread via GitHub


Kimahriman commented on PR #50612:
URL: https://github.com/apache/spark/pull/50612#issuecomment-2811589454

   > @Kimahriman Thanks for submitting this change. I just took a quick look. 
Please can you share more on the motivation for this and your use case? I would 
like to understand the issue you observed, the type of stateful query you ran, 
the state store provider you used and your cluster setup.
   
   There's a little more information in [the jira 
issue](https://issues.apache.org/jira/browse/SPARK-51823). The quick background 
is we do relatively large streaming deduplications and streaming aggregations 
(total state size can be in the 10s to 100s of TiB) with up to 10s of thousands 
of partitions. We've been dealing with issues related to this for a long time, 
and over time some fixes come out to make this situation better, but at the end 
of the day they are mostly band-aids to this type of scenario. We use the 
RocksDB state store for most things, and use bounded memory to limit resource 
utilization.
   
   This is the result of finally digging into why some of our partitions were 
taking over an hour to create a RocksDB snapshot to upload. This led us to find 
a lot of things potentially contributing to this:
   - The level-0 cache is pinned for all opened RocksDB instances on an 
executor. This can easily be several hundred on a single executor, and all that 
memory can't be freed even when those instances are not being used. This could 
be fixed by not pinning the level-0 cache
   - There seemed to be contention for background compaction, as we would see 
the checkpoint process start, and then nothing happen for that partition for an 
hour, and then finally compaction kick in and the checkpoint successfully 
created. This could be improved by increasing background threads
   
   But at the end of the day these are all workarounds to the problem that the 
existing stateful streaming approach doesn't work well with high-latency, 
high-volume queries, it's more designed around low-latency, low-volume queries. 
Additionally, we use a dynamic allocation setup, so it is very likely most of 
our executors will be deallocated before the next batch runs, so keeping the 
state stores open does nothing but waste resources.
   
   This change would also help the HDFSBackedStateStore have more use cases 
again and help some people avoid the added complexity of using RocksDB just to 
deal with all the state stores being kept on a small number of executors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-17 Thread via GitHub


anishshri-db commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2050005386


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala:
##
@@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
 }
   }
 
+  test("SPARK-51823: unload on commit") {

Review Comment:
   Should we add an integration test under RocksDBStateStoreIntegrationSuite 
with the config enabled ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-17 Thread via GitHub


Kimahriman commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2049770733


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala:
##
@@ -227,6 +228,32 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter {
 }
   }
 
+  test("SPARK-X: unload on commit") {

Review Comment:
   Whoops yeah fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-17 Thread via GitHub


Kimahriman commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2049770928


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2519,6 +2519,16 @@ object SQLConf {
   .stringConf
   .createWithDefault(CompressionCodec.LZ4)
 
+  val STATE_STORE_UNLOAD_ON_COMMIT =
+buildConf("spark.sql.streaming.stateStore.unloadOnCommit")
+  .internal()
+  .doc("When true, Spark will synchronously run maintenance and then close 
each StateStore " +
+"instance on task completion. This reduce overhead involved in keeping 
every StateStore " +

Review Comment:
   Updated a little more with the doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-17 Thread via GitHub


anishshri-db commented on PR #50612:
URL: https://github.com/apache/spark/pull/50612#issuecomment-2813774552

   @Kimahriman - yea and with 4.0, the only additional cost for `doMaintenance` 
would be around uploading the snapshot and deleting old versions. Both likely 
won't be expensive on a per batch basis (mostly a no-op for most batches) but 
you could see some spikes in latency when both these operations are performed. 
(I guess thats acceptable in your case rather than keeping the resources 
running)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-17 Thread via GitHub


Kimahriman commented on PR #50612:
URL: https://github.com/apache/spark/pull/50612#issuecomment-2813756089

   > @Kimahriman - if you are removing/adding executors per batch, then 
locality probably is not very useful.
   
   Yeah this includes not even reporting to the coordinator being active since 
that's just used for locality.
   
   > But I'm curious about the perf diff you see with large state (especially 
as the large state grows) - I guess it might not matter a whole lot - because 
even today - you are doing a fresh pull for each batch ?
   
   Yeah generally for us there's no performance drop since many of our 
executors will end up get deallocated between batches anyway, so we have to 
redownload the state each batch regardless. The long pole in the tent for us is 
generally the time it takes to create and upload a checkpoint. This is 
partially due to issues where a checkpoint is generally created every batch for 
RocksDB even with the changelog enabled, because of the hard coded 10k row 
check as well as not initializing the latest snapshot version on a fresh load 
(both of which appear to be fixed for the 4.0 release)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-51823][SS] Add config to not persist state store on executors [spark]

2025-04-17 Thread via GitHub


anishshri-db commented on PR #50612:
URL: https://github.com/apache/spark/pull/50612#issuecomment-2813739210

   @Kimahriman - if you are removing/adding executors per batch, then locality 
probably is not very useful. But I'm curious about the perf diff you see with 
large state (especially as the large state grows) - I guess it might not matter 
a whole lot - because even today - you are doing a fresh pull for each batch ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org