zsxwing commented on a change in pull request #33749:
URL: https://github.com/apache/spark/pull/33749#discussion_r690837847
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -497,23 +497,38 @@ case class RocksDBConf(
blockSizeKB: Long,
blockCacheSizeMB: Long,
lockAcquireTimeoutMs: Long,
- resetStatsOnLoad : Boolean)
+ resetStatsOnLoad : Boolean,
+ formatVersion: Int)
object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
- private case class ConfEntry(name: String, default: String) {
- def fullName: String =
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+ case class ConfEntry(name: String, default: String) {
+ def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}"
}
// Configuration that specifies whether to compact the RocksDB data every
time data is committed
- private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
+ val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
private val PAUSE_BG_WORK_FOR_COMMIT_CONF =
ConfEntry("pauseBackgroundWorkForCommit", "true")
private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
- private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs",
"60000")
+ val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+ // Configuration to set the RocksDB format version. When upgrading the
RocksDB version in Spark,
+ // it may introduce a new table format version that can not be supported by
an old RocksDB version
+ // used by an old Spark version. Hence, we store the table format version in
the checkpoint when
+ // a query starts, and when restarting a query from a checkpoint, we will
use the format version
+ // in the checkpoint. This will ensure the user can still rollback their
Spark version for an
+ // existing query when RocksDB changes its default table format in a new
version. The user can
Review comment:
This is to encourage users to upgrade their Spark. If we tell a user
that if they upgrade the Spark version for an existing production query, they
could not rollback to the old Spark version because old Spark version cannot
read the new checkpoint format, they probably would hesitate to upgrade Spark
(They may hit regressions in new Spark version).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]