anishshri-db commented on code in PR #47574:
URL: https://github.com/apache/spark/pull/47574#discussion_r1735292638
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -59,40 +64,44 @@ abstract class StatePartitionReaderBase(
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType,
- stateStoreMetadata: Array[StateMetadataTableEntry])
+ keyStateEncoderSpec: KeyStateEncoderSpec,
+ stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
+ stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema])
extends PartitionReader[InternalRow] with Logging {
- private val keySchema = SchemaUtil.getSchemaAsDataType(schema,
"key").asInstanceOf[StructType]
- private val valueSchema = SchemaUtil.getSchemaAsDataType(schema,
"value").asInstanceOf[StructType]
+ protected val keySchema = SchemaUtil.getSchemaAsDataType(
+ schema, "key").asInstanceOf[StructType]
+ protected val valueSchema = SchemaUtil.getSchemaAsDataType(
+ schema, "value").asInstanceOf[StructType]
protected lazy val provider: StateStoreProvider = {
val stateStoreId =
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
- val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
- logWarning("Metadata for state store not found, possible cause is this
checkpoint " +
- "is created by older version of spark. If the query has session window
aggregation, " +
- "the state can't be read correctly and runtime exception will be
thrown. " +
- "Run the streaming query in newer spark version to generate state
metadata " +
- "can fix the issue.")
- 0
- } else {
- require(stateStoreMetadata.length == 1)
- stateStoreMetadata.head.numColsPrefixKey
- }
- // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support
for this will be
- // added in the future along with state metadata changes.
- // Filed JIRA here: https://issues.apache.org/jira/browse/SPARK-47524
- val keyStateEncoderType = if (numColsPrefixKey > 0) {
- PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+ val useColFamilies = if (stateVariableInfoOpt.isDefined) {
+ true
} else {
- NoPrefixKeyStateEncoderSpec(keySchema)
+ false
}
- StateStoreProvider.createAndInit(
- stateStoreProviderId, keySchema, valueSchema, keyStateEncoderType,
- useColumnFamilies = false, storeConf, hadoopConf.value,
+ val provider = StateStoreProvider.createAndInit(
+ stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ useColumnFamilies = useColFamilies, storeConf, hadoopConf.value,
useMultipleValuesPerKey = false)
+
+ if (useColFamilies) {
+ val store = provider.getStore(partition.sourceOptions.batchId + 1)
+ require(stateStoreColFamilySchemaOpt.isDefined)
+ val stateStoreColFamilySchema = stateStoreColFamilySchemaOpt.get
Review Comment:
I don't think that will work @HeartSaVioR - in this case - in the regular
path, the operator `init` function is invoking the `createColFamilyIfAbsent`
call that will add the encoding info to the map in the provider. So we need to
emulate that here as well.
If I remove the check, the test fails as expected where its unable to find
the encoder in the provider
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]