HeartSaVioR commented on code in PR #47978:
URL: https://github.com/apache/spark/pull/47978#discussion_r1746477607


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -166,16 +184,22 @@ class StatePartitionReader(
         stateVariableInfoOpt match {
           case Some(stateVarInfo) =>
             val stateVarType = stateVarInfo.stateVariableType
-            val hasTTLEnabled = stateVarInfo.ttlEnabled
 
             stateVarType match {
               case StateVariableType.ValueState =>
-                if (hasTTLEnabled) {
-                  SchemaUtil.unifyStateRowPairWithTTL((pair.key, pair.value), 
valueSchema,
-                    partition.partition)
-                } else {
-                  SchemaUtil.unifyStateRowPair((pair.key, pair.value), 
partition.partition)
+                SchemaUtil.unifyStateRowPair((pair.key, pair.value), 
partition.partition)
+
+              case StateVariableType.ListState =>
+                val key = pair.key
+                val result = store.valuesIterator(key, stateVarName)
+                var unsafeRowArr: Seq[UnsafeRow] = Seq.empty

Review Comment:
   Worth noting that we are materializing every element in the list, so despite 
we avoid memory issue in ListState, the problem will pop up when reading 
ListState via state data source reader.
   
   I guess it's probably worth reconsidering the schema and UX. e.g. having 
`index` and `value` as columns for the value of list state and allow multiple 
rows for the same state value. (You've used explode in the example but we could 
just provide that result directly.) You'll also have the same issue with 
MapType as well, so worth considering.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -91,23 +90,22 @@ object SchemaUtil {
       "change_type" -> classOf[StringType],
       "key" -> classOf[StructType],
       "value" -> classOf[StructType],
-      "partition_id" -> classOf[IntegerType],
-      "expiration_timestamp" -> classOf[LongType])
+      "single_value" -> classOf[StructType],
+      "list_value" -> classOf[ArrayType],
+      "partition_id" -> classOf[IntegerType])
 
     val expectedFieldNames = if (sourceOptions.readChangeFeed) {
       Seq("batch_id", "change_type", "key", "value", "partition_id")
     } else if (transformWithStateVariableInfoOpt.isDefined) {
       val stateVarInfo = transformWithStateVariableInfoOpt.get
-      val hasTTLEnabled = stateVarInfo.ttlEnabled
       val stateVarType = stateVarInfo.stateVariableType
 
       stateVarType match {
         case StateVariableType.ValueState =>

Review Comment:
   Now the schema is quite dynamic - have we run `explain` to check the schema 
is captured in dry-run as well?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -166,16 +184,22 @@ class StatePartitionReader(
         stateVariableInfoOpt match {
           case Some(stateVarInfo) =>
             val stateVarType = stateVarInfo.stateVariableType
-            val hasTTLEnabled = stateVarInfo.ttlEnabled

Review Comment:
   Is it intentional to remove out the readability of TTL functionality for 
value state? Just wanted to know whether we give up functionality in certain 
reason, or you want to defer this till we have every types in support range and 
think about UX which works for every types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to