ericm-db commented on code in PR #48205:
URL: https://github.com/apache/spark/pull/48205#discussion_r1772285154


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala:
##########
@@ -43,6 +43,10 @@ object TransformWithStateVariableUtils {
   def getMapState(stateName: String, ttlEnabled: Boolean): 
TransformWithStateVariableInfo = {
     TransformWithStateVariableInfo(stateName, StateVariableType.MapState, 
ttlEnabled)
   }
+
+  def getTimerState(stateName: String): TransformWithStateVariableInfo = {
+    TransformWithStateVariableInfo(stateName, StateVariableType.ValueState, 
ttlEnabled = false)

Review Comment:
   Do we want this to be ValueState? Why not create another enum? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -318,6 +318,22 @@ class DriverStatefulProcessorHandleImpl(timeMode: 
TimeMode, keyExprEnc: Expressi
     }
   }
 
+  private def addTimerColFamily(): Unit = {
+    val stateName = TimerStateUtils.getTimerStateVarName(timeMode.toString)
+    val timerEncoder = new TimerKeyEncoder(keyExprEnc)
+    val colFamilySchema = StateStoreColumnFamilySchemaUtils.
+      getTimerStateSchema(stateName, timerEncoder.schemaForKeyRow, 
timerEncoder.schemaForValueRow)
+    columnFamilySchemas.put(stateName, colFamilySchema)
+    val stateVariableInfo = 
TransformWithStateVariableUtils.getTimerState(stateName)
+    stateVariableInfos.put(stateName, stateVariableInfo)
+  }
+
+  // If timeMode is not None, add a timer column family schema to the operator 
metadata so that
+  // registered timers can be read using the state data source reader.

Review Comment:
   nit: Can we move this to the top of the class?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -153,10 +154,21 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
           "No state variable names are defined for the transformWithState 
operator")
       }
 
+      val twsOperatorProperties = 
TransformWithStateOperatorProperties.fromJson(operatorProperties)
+      val timeMode = twsOperatorProperties.timeMode
+      if (sourceOptions.readRegisteredTimers && timeMode == 
TimeMode.None().toString) {
+        throw StateDataSourceErrors.invalidOptionValue(READ_REGISTERED_TIMERS,
+          "Registered timers are not available in TimeMode=None.")
+      }
+
       // if the state variable is not one of the defined/available state 
variables, then we
       // fail the query
-      val stateVarName = sourceOptions.stateVarName.get
-      val twsOperatorProperties = 
TransformWithStateOperatorProperties.fromJson(operatorProperties)
+      val stateVarName = if (sourceOptions.readRegisteredTimers) {
+        TimerStateUtils.getTimerStateVarName(timeMode)
+      } else {
+        sourceOptions.stateVarName.get
+      }

Review Comment:
   nit: Possible to have one check for readRegisteredTimers? 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -481,4 +481,94 @@ class StateDataSourceTransformWithStateSuite extends 
StateStoreMetricsTest
       }
     }
   }

Review Comment:
   Can we add a test case for readRegisteredTimers = false to verify exceptions 
are thrown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to