xuanyuanking commented on a change in pull request #34502:
URL: https://github.com/apache/spark/pull/34502#discussion_r751969435
##########
File path: docs/structured-streaming-programming-guide.md
##########
@@ -1956,8 +1956,21 @@ Here are the configs regarding to RocksDB instance of
the state store provider:
<td>Whether we resets all ticker and histogram stats for RocksDB on
load.</td>
<td>True</td>
</tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows</td>
+ <td>Whether we track the total number of rows in state store. Please refer
the details in <a href="#performance-aspect-considerations">Performance-aspect
considerations</a>.</td>
+ <td>True</td>
+ </tr>
</table>
+##### Performance-aspect considerations
+
+1. For write-heavy workloads, you may want to disable the track of total
number of rows.
Review comment:
Do we have others considerations here? Or we'll add more in the future?
(Just want to double confirm the `1.` is not a typo.)
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -144,26 +156,28 @@ class RocksDB(
* Put the given value for the given key and return the last written value.
* @note This update is not committed to disk until commit() is called.
*/
- def put(key: Array[Byte], value: Array[Byte]): Array[Byte] = {
- val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
- writeBatch.put(key, value)
- if (oldValue == null) {
- numKeysOnWritingVersion += 1
+ def put(key: Array[Byte], value: Array[Byte]): Unit = {
+ if (conf.trackTotalNumberOfRows) {
+ val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
+ if (oldValue == null) {
+ numKeysOnWritingVersion += 1
+ }
}
- oldValue
+ writeBatch.put(key, value)
}
/**
* Remove the key if present, and return the previous value if it was
present (null otherwise).
Review comment:
Same here
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -144,26 +156,28 @@ class RocksDB(
* Put the given value for the given key and return the last written value.
Review comment:
Please also change the comment correspondingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]