micheal-o commented on code in PR #53459:
URL: https://github.com/apache/spark/pull/53459#discussion_r2670074255


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -296,29 +319,31 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
           if (sourceOptions.readRegisteredTimers) {
             stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
-          }
-          // When reading all column families (for repartitioning), we collect 
all state variable
-          // infos instead of validating a specific stateVarName. This skips 
the normal validation
-          // logic because we're not reading a specific state variable - we're 
reading all of them.
-          if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+          } else if (sourceOptions.internalOnlyReadAllColumnFamilies) {

Review Comment:
   why not a separate if condition? Right now you are doing:
   ```
   if {}
   else if {}
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -296,29 +319,31 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
           if (sourceOptions.readRegisteredTimers) {
             stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
-          }
-          // When reading all column families (for repartitioning), we collect 
all state variable
-          // infos instead of validating a specific stateVarName. This skips 
the normal validation
-          // logic because we're not reading a specific state variable - we're 
reading all of them.
-          if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+          } else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+            // When reading all column families (for repartitioning) for TWS 
operator,
+            // we will just choose a random state as placeholder for default 
column family,
+            // because we need to use matching stateVariableInfo and 
stateStoreColFamilySchemaOpt
+            // to inferSchema (partitionKey in particular) later
+            stateVarName = operatorProperties.stateVariables.head.stateName
             stateVariableInfos = operatorProperties.stateVariables
-          } else {
-            var stateVarInfoList = operatorProperties.stateVariables
-              .filter(stateVar => stateVar.stateName == stateVarName)
-            if (stateVarInfoList.isEmpty &&
+          }
+
+          var stateVarInfoList = operatorProperties.stateVariables
+            .filter(stateVar => stateVar.stateName == stateVarName)
+          if (!TimerStateUtils.isTimerCFName(stateVarName) &&
               
StateStoreColumnFamilySchemaUtils.isTestingInternalColFamily(stateVarName)) {
-              // pass this dummy TWSStateVariableInfo for TWS internal column 
family during testing,
-              // because internalColumns are not register in 
operatorProperties.stateVariables,
-              // thus stateVarInfoList will be empty.
-              stateVarInfoList = List(TransformWithStateVariableInfo(
-                stateVarName, StateVariableType.ValueState, false
-              ))
-            }
-            require(stateVarInfoList.size == 1, s"Failed to find unique state 
variable info " +
-              s"for state variable $stateVarName in operator 
${sourceOptions.operatorId}")
-            val stateVarInfo = stateVarInfoList.head
-            transformWithStateVariableInfoOpt = Some(stateVarInfo)
+            // pass this dummy TWSStateVariableInfo for TWS internal column 
family during testing,
+            // because internalColumns are not register in 
operatorProperties.stateVariables,

Review Comment:
   nit: internal column families or internal CFs



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -374,13 +399,22 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
       }
     }
 
+    val operatorName = if (storeMetadata.nonEmpty) 
storeMetadata.head.operatorName else ""
+    val stateFormatVersion = getStateFormatVersion(storeMetadata, 
sourceOptions.resolvedCpLocation)
+    val allColFamilyReaderInfoOpt: Option[AllColumnFamiliesReaderInfo] =
+      if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+        Option(AllColumnFamiliesReaderInfo(

Review Comment:
   nit: `Some`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -296,29 +319,31 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
           if (sourceOptions.readRegisteredTimers) {
             stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
-          }
-          // When reading all column families (for repartitioning), we collect 
all state variable
-          // infos instead of validating a specific stateVarName. This skips 
the normal validation
-          // logic because we're not reading a specific state variable - we're 
reading all of them.
-          if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+          } else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+            // When reading all column families (for repartitioning) for TWS 
operator,
+            // we will just choose a random state as placeholder for default 
column family,
+            // because we need to use matching stateVariableInfo and 
stateStoreColFamilySchemaOpt
+            // to inferSchema (partitionKey in particular) later
+            stateVarName = operatorProperties.stateVariables.head.stateName
             stateVariableInfos = operatorProperties.stateVariables
-          } else {
-            var stateVarInfoList = operatorProperties.stateVariables
-              .filter(stateVar => stateVar.stateName == stateVarName)
-            if (stateVarInfoList.isEmpty &&
+          }
+
+          var stateVarInfoList = operatorProperties.stateVariables
+            .filter(stateVar => stateVar.stateName == stateVarName)
+          if (!TimerStateUtils.isTimerCFName(stateVarName) &&

Review Comment:
   Are you sure this doesn't apply to timer CFs?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -374,13 +399,22 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
       }
     }
 
+    val operatorName = if (storeMetadata.nonEmpty) 
storeMetadata.head.operatorName else ""
+    val stateFormatVersion = getStateFormatVersion(storeMetadata, 
sourceOptions.resolvedCpLocation)

Review Comment:
   These 2 vals should be moved under the if all CF reader check below, since 
it is only used there



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -114,6 +115,58 @@ object StateStoreColumnFamilySchemaUtils {
     org.apache.spark.util.Utils.isTesting && isInternalColFamily(colFamilyName)
   }
 
+  /**
+   * Extracts the base state variable name from internal column family names.
+   * Internal column families are auxiliary data structures (TTL index, min 
expiry index,
+   * count index, row counter) that are associated with a user-defined state 
variable.
+   *
+   * @param colFamilyName The internal column family name (must start with "$")
+   * @return The base state variable name
+   * @throws IllegalArgumentException if the column family name is not a 
recognized internal type
+   */
+  def getStateNameForInternalCF(colFamilyName: String): String = {
+    if (isTtlColFamilyName(colFamilyName)) {
+      getStateNameFromTtlColFamily(colFamilyName)
+    } else if (isMinExpiryIndexCFName(colFamilyName)) {
+      getStateNameFromMinExpiryIndexCFName(colFamilyName)
+    } else if (isCountIndexCFName(colFamilyName)) {
+      getStateNameFromCountIndexCFName(colFamilyName)
+    } else if (isRowCounterCFName(colFamilyName)) {
+      getStateNameFromRowCounterCFName(colFamilyName)
+    } else if (TimerStateUtils.isTimerCFName(colFamilyName)) {
+      if (TimerStateUtils.isTimerSecondaryIndexCF(colFamilyName)) {

Review Comment:
   nit: add comment why



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -38,7 +39,9 @@ import org.apache.spark.util.{NextIterator, 
SerializableConfiguration}
  */
 case class AllColumnFamiliesReaderInfo(
     colFamilySchemas: Set[StateStoreColFamilySchema] = Set.empty,
-    stateVariableInfos: List[TransformWithStateVariableInfo] = List.empty)
+    stateVariableInfos: List[TransformWithStateVariableInfo] = List.empty,
+    operatorName: String = "",

Review Comment:
   operatorName is required. Shouldn't default to ""



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -49,8 +50,27 @@ object SchemaUtil {
       keySchema: StructType,
       valueSchema: StructType,
       transformWithStateVariableInfoOpt: 
Option[TransformWithStateVariableInfo],
-      stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]): 
StructType = {
-    if (transformWithStateVariableInfoOpt.isDefined) {
+      stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
+      operatorName: Option[String],
+      stateFormatVersion: Option[Int] = None): StructType = {
+    if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+      require(stateStoreColFamilySchemaOpt.isDefined)
+      require(operatorName.isDefined)
+      val colFamilyName: String = 
stateStoreColFamilySchemaOpt.map(_.colFamilyName)
+        .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)

Review Comment:
   do you need this given that you require 
`stateStoreColFamilySchemaOpt.isDefined`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -132,6 +133,28 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
   override def supportsExternalMetadata(): Boolean = false
 
+  /**
+   * Return the state format version for SYMMETRIC_HASH_JOIN operators.
+   * This currently only supports join operators because this function is only 
used to
+   * create a PartitionKeyExtractor through PartitionKeyExtractorFactory where 
only join operators
+   * require state format version
+   */
+  private def getStateFormatVersion(
+      storeMetadata: Array[StateMetadataTableEntry],
+      checkpointLocation: String
+    ): Option[Int] = {
+    if (storeMetadata.nonEmpty &&
+      storeMetadata.head.operatorName == 
StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) {
+      new StreamingQueryCheckpointMetadata(session, 
checkpointLocation).offsetLog
+        .getLatest()

Review Comment:
   get for the batch we are reading instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to