zifeif2 commented on code in PR #53459:
URL: https://github.com/apache/spark/pull/53459#discussion_r2673352918


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -296,29 +319,31 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
           if (sourceOptions.readRegisteredTimers) {
             stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
-          }
-          // When reading all column families (for repartitioning), we collect 
all state variable
-          // infos instead of validating a specific stateVarName. This skips 
the normal validation
-          // logic because we're not reading a specific state variable - we're 
reading all of them.
-          if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+          } else if (sourceOptions.internalOnlyReadAllColumnFamilies) {

Review Comment:
   Hmm either will work, since we don't allow setting both readRegisteredTimers 
and intenralOnlyReadAllColumnFamilies at the smae time. I can change it in the 
next version



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -374,13 +399,22 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
       }
     }
 
+    val operatorName = if (storeMetadata.nonEmpty) 
storeMetadata.head.operatorName else ""
+    val stateFormatVersion = getStateFormatVersion(storeMetadata, 
sourceOptions.resolvedCpLocation)
+    val allColFamilyReaderInfoOpt: Option[AllColumnFamiliesReaderInfo] =
+      if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+        Option(AllColumnFamiliesReaderInfo(

Review Comment:
   curious: When do we have one preference over the other? I only know from the 
[style guide ](https://github.com/databricks/scala-style-guide) that we use 
`Option` to guard against `null`, but I don't think it applies here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -296,29 +319,31 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
           if (sourceOptions.readRegisteredTimers) {
             stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
-          }
-          // When reading all column families (for repartitioning), we collect 
all state variable
-          // infos instead of validating a specific stateVarName. This skips 
the normal validation
-          // logic because we're not reading a specific state variable - we're 
reading all of them.
-          if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+          } else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
+            // When reading all column families (for repartitioning) for TWS 
operator,
+            // we will just choose a random state as placeholder for default 
column family,
+            // because we need to use matching stateVariableInfo and 
stateStoreColFamilySchemaOpt
+            // to inferSchema (partitionKey in particular) later
+            stateVarName = operatorProperties.stateVariables.head.stateName
             stateVariableInfos = operatorProperties.stateVariables
-          } else {
-            var stateVarInfoList = operatorProperties.stateVariables
-              .filter(stateVar => stateVar.stateName == stateVarName)
-            if (stateVarInfoList.isEmpty &&
+          }
+
+          var stateVarInfoList = operatorProperties.stateVariables
+            .filter(stateVar => stateVar.stateName == stateVarName)
+          if (!TimerStateUtils.isTimerCFName(stateVarName) &&

Review Comment:
   yeah, when `readRegisteredTimers` is set, stateVarName is set to the timer 
column. From line 333-334 above, we would have gotten the correct 
stateVarInfoList, thus won't need to assign it a dummy one like below. 
   ```
             var stateVarInfoList = operatorProperties.stateVariables
               .filter(stateVar => stateVar.stateName == stateVarName)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to