HeartSaVioR commented on code in PR #47978:
URL: https://github.com/apache/spark/pull/47978#discussion_r1746477607
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -166,16 +184,22 @@ class StatePartitionReader(
stateVariableInfoOpt match {
case Some(stateVarInfo) =>
val stateVarType = stateVarInfo.stateVariableType
- val hasTTLEnabled = stateVarInfo.ttlEnabled
stateVarType match {
case StateVariableType.ValueState =>
- if (hasTTLEnabled) {
- SchemaUtil.unifyStateRowPairWithTTL((pair.key, pair.value),
valueSchema,
- partition.partition)
- } else {
- SchemaUtil.unifyStateRowPair((pair.key, pair.value),
partition.partition)
+ SchemaUtil.unifyStateRowPair((pair.key, pair.value),
partition.partition)
+
+ case StateVariableType.ListState =>
+ val key = pair.key
+ val result = store.valuesIterator(key, stateVarName)
+ var unsafeRowArr: Seq[UnsafeRow] = Seq.empty
Review Comment:
Worth noting that we are materializing every element in the list, so despite
we avoid memory issue in ListState, the problem will pop up when reading
ListState via state data source reader.
I guess it's probably worth reconsidering the schema and UX. e.g. having
`index` and `value` as columns for the value of list state and allow multiple
rows for the same state value. (You've used explode in the example but we could
just provide that result directly.) You'll also have the same issue with
MapType as well, so worth considering.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -91,23 +90,22 @@ object SchemaUtil {
"change_type" -> classOf[StringType],
"key" -> classOf[StructType],
"value" -> classOf[StructType],
- "partition_id" -> classOf[IntegerType],
- "expiration_timestamp" -> classOf[LongType])
+ "single_value" -> classOf[StructType],
+ "list_value" -> classOf[ArrayType],
+ "partition_id" -> classOf[IntegerType])
val expectedFieldNames = if (sourceOptions.readChangeFeed) {
Seq("batch_id", "change_type", "key", "value", "partition_id")
} else if (transformWithStateVariableInfoOpt.isDefined) {
val stateVarInfo = transformWithStateVariableInfoOpt.get
- val hasTTLEnabled = stateVarInfo.ttlEnabled
val stateVarType = stateVarInfo.stateVariableType
stateVarType match {
case StateVariableType.ValueState =>
Review Comment:
Now the schema is quite dynamic - have we run `explain` to check the schema
is captured in dry-run as well?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -166,16 +184,22 @@ class StatePartitionReader(
stateVariableInfoOpt match {
case Some(stateVarInfo) =>
val stateVarType = stateVarInfo.stateVariableType
- val hasTTLEnabled = stateVarInfo.ttlEnabled
Review Comment:
Is it intentional to remove out the readability of TTL functionality for
value state? Just wanted to know whether we give up functionality in certain
reason, or you want to defer this till we have every types in support range and
think about UX which works for every types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]