anishshri-db commented on code in PR #53287:
URL: https://github.com/apache/spark/pull/53287#discussion_r2604084503
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -180,7 +180,7 @@ class RocksDB(
@volatile private var db: NativeRocksDB = _
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] =
None
- private val enableChangelogCheckpointing: Boolean =
conf.enableChangelogCheckpointing
+ @volatile private var enableChangelogCheckpointing: Boolean =
conf.enableChangelogCheckpointing
Review Comment:
Why are we changing to `var` ?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1438,6 +1441,27 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
"HDFSBackedStateStoreProvider does not support checkpointFormatVersion >
1"))
}
+ test("SPARK-54420: HDFSBackedStateStoreProvider does not support load empty
store") {
Review Comment:
nit: `loading empty store`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -322,12 +322,17 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
override def getStore(
version: Long,
uniqueId: Option[String] = None,
- forceSnapshotOnCommit: Boolean = false): StateStore = {
+ forceSnapshotOnCommit: Boolean = false,
+ loadEmpty: Boolean = false): StateStore = {
if (uniqueId.isDefined) {
throw StateStoreErrors.stateStoreCheckpointIdsNotSupported(
"HDFSBackedStateStoreProvider does not support checkpointFormatVersion
> 1 " +
"but a state store checkpointID is passed in")
}
+ if (loadEmpty) {
+ throw StateStoreErrors.unsupportedOperationException("getStore",
+ "loadEmpty parameter is not supported in HDFSBackedStateStoreProvider")
Review Comment:
Not sure this is actionable for the user - can we rephrase the error msg if
possible ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -44,6 +44,16 @@ object SchemaUtil {
}
}
+ def getScanAllColumnFamiliesSchema(keySchema: StructType): StructType = {
+ new StructType()
+ // todo [SPARK-54443]: change keySchema to a more specific type after we
+ // can extract partition key from keySchema
Review Comment:
nit: extra space before `can`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -62,13 +72,7 @@ object SchemaUtil {
.add("value", valueSchema)
.add("partition_id", IntegerType)
} else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
- new StructType()
- // todo [SPARK-54443]: change keySchema to a more specific type after
we
- // can extract partition key from keySchema
- .add("partition_key", keySchema)
- .add("key_bytes", BinaryType)
- .add("value_bytes", BinaryType)
- .add("column_family_name", StringType)
+ getScanAllColumnFamiliesSchema(keySchema)
Review Comment:
Why are we splitting only this function out ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -44,6 +44,16 @@ object SchemaUtil {
}
}
+ def getScanAllColumnFamiliesSchema(keySchema: StructType): StructType = {
Review Comment:
nit: can we reword the function name here to make it clearer ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]