[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1041871281


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -310,6 +311,9 @@ class RocksDB(
 "checkpoint" -> checkpointTimeMs,
 "fileSync" -> fileSyncTimeMs
   )
+  // reset resources as we already pushed the changes and it has been 
committed

Review Comment:
   Actually I intended to put this here, as I thought we have to clean up only 
when it is successfully committed. But looks like it's also safe to place this 
to `finally`, as we invalidate the loaded version in any way if any exception 
happens. Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1041686643


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   I just changed it to remove the existing key. It may be more obvious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1041654592


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   No, the functionality is not here. We don't leverage TTL for the general 
state store operations. But remove would work as well. If you think it's easier 
to reason about, I can update the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1041654592


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   No, the functionality is not here. We don't leverage TTL for the general 
state store operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1041595635


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   Sounds good, thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1040611003


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   I'm not an expert of RocksDB, but I could explain this according to the high 
level architecture.
   
   
https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
   
   During RocksDB.commit(), we write synchronously and flush synchronously, 
meaning that we will produce a new SST file for new writes. (As SST file is 
immutable, you can't append any new write against existing SST files.)
   
   That said, for each loop, we write a bunch of overwritten keys, and produce 
a new SST file containing overwritten keys. L0 allows SST files to have 
overlapping key ranges, but it is no longer allowed in upper level, which is 
ensured via "compaction". We trigger manual compaction based on the config, 
`compactOnCommit = true`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-05 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1040611003


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   I'm not an expert of RocksDB, but looks like I could explain this according 
to the high level architecture.
   
   
https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
   
   During RocksDB.commit(), we write synchronously and flush synchronously, 
meaning that we will produce a new SST file for new writes. (As SST file is 
immutable, you can't append any new write against existing SST files.)
   
   That said, for each loop, we write a bunch of overwritten keys, and produce 
a new SST file containing overwritten keys. L0 allows SST files to have 
overlapping key ranges, but it is no longer allowed in upper level, which is 
ensured via "compaction". We trigger manual compaction based on the config, 
`compactOnCommit = true`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-05 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1040611003


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   I'm not an expert of RocksDB, but looks like I could explain this according 
to the high level architecture.
   
   
https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
   
   During RocksDB.commit(), we write synchronously and flush synchronously, 
meaning that we will produce a new SST file for new writes. (As SST file is 
immutable, you can't append any new write against existing SST files.)
   
   That said, for each loop, we write a bunch of overwritten keys, and produce 
a new SST file containing overwritten keys. L0 allows SST files to have 
overlapping key ranges, but it is no longer allowed in upper level, which is 
ensured via "compaction".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-05 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1040611003


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   I'm not an expert of RocksDB, but looks like we could expect compaction 
according to the high level architecture.
   
   
https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
   
   During RocksDB.commit(), we write synchronously and flush synchronously, 
meaning that we will produce a new SST file for new writes. (As SST file is 
immutable, you can't append any new write against existing SST files.)
   
   That said, for each loop, we write a bunch of overwritten keys, and produce 
a new SST file containing overwritten keys. L0 allows SST files to have 
overlapping key ranges, but it is no longer allowed in upper level, which is 
ensured via "compaction".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-02 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1038137730


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   Yeah, I don't know much on the details of RocksDB so can't tell compaction 
would never happen with pure addition (It would be a smart move if RocksDB 
performs compaction if there are 1s of SST files.) but at least the test 
was not sufficient to trigger the compaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-02 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1037968475


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   This was something we could indicate the bug earlier... 
   
   The test was incorrect. The test expects that compaction will happen, but if 
we don't "overwrite" the key and just put the new keys here, compaction does 
not happen, at least with Spark's default config. The reason compaction had 
happened is, writebatch wasn't cleaned up during loop, hence writes are 
accumulated into the same writebatch and it led to overwrites. If we fix the 
code but leave this test as it is, then there are 50 SST files meaning no 
compaction happens, and test fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-02 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1037968475


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -116,7 +116,9 @@ class RocksDBSuite extends SparkFunSuite {
 withDB(remoteDir, conf = conf) { db =>
   // Generate versions without cleaning up
   for (version <- 1 to 50) {
-db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...

Review Comment:
   This was something we could indicate the bug earlier... 
   
   The test was incorrect. The test expects that compaction will happen, but if 
we don't "overwrite" the key and just put the new keys here, compaction does 
not happen, at least with Spark's default config. The reason compaction had 
happened is, writebatch wasn't cleaned up during loop, hence writes are 
accumulated into the same writebatch and it led to overwrites.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org