anishshri-db commented on code in PR #47574:
URL: https://github.com/apache/spark/pull/47574#discussion_r1710427033
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -59,40 +61,37 @@ abstract class StatePartitionReaderBase(
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType,
- stateStoreMetadata: Array[StateMetadataTableEntry])
+ keyStateEncoderSpec: KeyStateEncoderSpec,
+ stateVariableInfoOpt: Option[TransformWithStateVariableInfo])
extends PartitionReader[InternalRow] with Logging {
- private val keySchema = SchemaUtil.getSchemaAsDataType(schema,
"key").asInstanceOf[StructType]
- private val valueSchema = SchemaUtil.getSchemaAsDataType(schema,
"value").asInstanceOf[StructType]
+ protected val keySchema = SchemaUtil.getSchemaAsDataType(
+ schema, "key").asInstanceOf[StructType]
+ protected val valueSchema = SchemaUtil.getSchemaAsDataType(
+ schema, "value").asInstanceOf[StructType]
protected lazy val provider: StateStoreProvider = {
val stateStoreId =
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
- val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
- logWarning("Metadata for state store not found, possible cause is this
checkpoint " +
- "is created by older version of spark. If the query has session window
aggregation, " +
- "the state can't be read correctly and runtime exception will be
thrown. " +
- "Run the streaming query in newer spark version to generate state
metadata " +
- "can fix the issue.")
- 0
- } else {
- require(stateStoreMetadata.length == 1)
- stateStoreMetadata.head.numColsPrefixKey
- }
- // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support
for this will be
- // added in the future along with state metadata changes.
- // Filed JIRA here: https://issues.apache.org/jira/browse/SPARK-47524
- val keyStateEncoderType = if (numColsPrefixKey > 0) {
- PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+ val useColFamilies = if (stateVariableInfoOpt.isDefined) {
+ true
} else {
- NoPrefixKeyStateEncoderSpec(keySchema)
+ false
}
- StateStoreProvider.createAndInit(
- stateStoreProviderId, keySchema, valueSchema, keyStateEncoderType,
- useColumnFamilies = false, storeConf, hadoopConf.value,
+ val provider = StateStoreProvider.createAndInit(
+ stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
+ useColumnFamilies = useColFamilies, storeConf, hadoopConf.value,
useMultipleValuesPerKey = false)
+
+ if (useColFamilies) {
+ val store = provider.getStore(partition.sourceOptions.batchId + 1)
Review Comment:
Yea might still be needed, but I need to merge back his change for the TTL
changes to work
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]