anishshri-db commented on code in PR #53316:
URL: https://github.com/apache/spark/pull/53316#discussion_r2628591013
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -258,32 +273,117 @@ class StatePartitionAllColumnFamiliesReader(
partition: StateStoreInputPartition,
schema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
- stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
+ defaultStateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
+ stateSchemaProviderOpt: Option[StateSchemaProvider],
+ allColumnFamiliesReaderInfo: AllColumnFamiliesReaderInfo)
extends StatePartitionReaderBase(
storeConf,
hadoopConf, partition, schema,
- keyStateEncoderSpec, None, stateStoreColFamilySchemaOpt, None, None) {
+ keyStateEncoderSpec, None,
+ defaultStateStoreColFamilySchemaOpt,
+ stateSchemaProviderOpt, None) {
- private lazy val store: ReadStateStore = {
+ private val stateStoreColFamilySchemas =
allColumnFamiliesReaderInfo.colFamilySchemas
+ private val stateVariableInfos =
allColumnFamiliesReaderInfo.stateVariableInfos
+
+ private def isListType(colFamilyName: String): Boolean = {
+ SchemaUtil.checkVariableType(
+ stateVariableInfos.find(info => info.stateName == colFamilyName),
+ StateVariableType.ListState)
+ }
+
+ override protected lazy val provider: StateStoreProvider = {
+ val stateStoreId =
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
+ partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
+ val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
+ val useColumnFamilies = stateStoreColFamilySchemas.size > 1
+ StateStoreProvider.createAndInit(
+ stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ useColumnFamilies, storeConf, hadoopConf.value,
+ useMultipleValuesPerKey = false, stateSchemaProviderOpt)
+ }
+
+
+ private def checkAllColFamiliesExist(
+ colFamilyNames: List[String], stateStore: StateStore
Review Comment:
nit: move arg to line below ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]