chaoqin-li1123 commented on code in PR #43788:
URL: https://github.com/apache/spark/pull/43788#discussion_r1393178895


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -195,7 +195,7 @@ class StateMetadataPartitionReader(
     }
   }
 
-  private lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
+  private[state] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {

Review Comment:
   Yes, I use this class to get prefix key number but don't want to access the 
internal row because it is untyped. So I need to exposed the api to access the 
metadata as case class



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -57,12 +58,31 @@ class StatePartitionReader(
       partition.sourceOptions.operatorId, partition.partition, 
partition.sourceOptions.storeName)
     val stateStoreProviderId = StateStoreProviderId(stateStoreId, 
partition.queryId)
 
+    val allStateStoreMetadata = new StateMetadataPartitionReader(
+      partition.sourceOptions.stateCheckpointLocation.getParent.toString, 
hadoopConf)
+      .stateMetadata.toArray
+
+    val stateStoreMetadata = allStateStoreMetadata.filter(entry =>
+      entry.operatorId == partition.sourceOptions.operatorId
+        && entry.stateStoreName == partition.sourceOptions.storeName
+    )
+    val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
+      logWarning("Metadata for state store not found, possible cause is this 
checkpoint " +
+        "is created by older version of spark. The state of session window 
aggregation can't be " +
+        "read correctly without state metadata and runtime exception will be 
thrown. " +
+        "Run the streaming query in newer spark version to generate state 
metadata.")
+      0
+    } else {
+      require(stateStoreMetadata.length == 1)
+      stateStoreMetadata.head.numColsPrefixKey
+    }
+
     // TODO: This does not handle the case of session window aggregation; we 
don't have an

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to