HeartSaVioR commented on code in PR #48110:
URL: https://github.com/apache/spark/pull/48110#discussion_r1770716704
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -178,44 +178,75 @@ class StatePartitionReader(
}
}
+ private def processListStateEntries(stateVarName: String):
Iterator[InternalRow] = {
Review Comment:
nit: better to move these new private methods below the definition of iter.
I'm even OK to see this after `close()`. Let's keep the important and
caller-facing (public) things to show earlier.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -178,44 +178,75 @@ class StatePartitionReader(
}
}
+ private def processListStateEntries(stateVarName: String):
Iterator[InternalRow] = {
Review Comment:
Or, why not just move this out to SchemaUtil as well, like
unifyMapStateRowPair?
After moving this out, I don't think we need to add individual private
methods - they are simple enough to inline.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -368,6 +385,20 @@ class StateDataSourceTransformWithStateSuite extends
StateStoreMetricsTest
checkAnswer(valuesDf,
Seq(Row("session1", "group1"), Row("session1", "group2"),
Row("session1", "group4"),
Row("session2", "group1"), Row("session3", "group7")))
+
+ val flattenedStateReaderDf = spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+ .option(StateSourceOptions.STATE_VAR_NAME, "groupsListWithTTL")
+ .load()
+
+ val outputDf = flattenedStateReaderDf
Review Comment:
Shall we also verify with TTL values like non-flattened mode?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -213,13 +238,21 @@ object SchemaUtil {
stateVarType match {
case ValueState =>
- Seq("key", "single_value", "partition_id")
+ Seq("key", "value", "partition_id")
case ListState =>
- Seq("key", "list_value", "partition_id")
+ if (sourceOptions.flattenCollectionTypes) {
+ Seq("key", "value", "partition_id")
Review Comment:
Would we like to differentiate this with ValueState? like `list_element`.
Users would be able to expect that there will be multiple values for the key
based on the schema.
I'm also OK to assume that users would know about the state type for the
variable. Just a 2 cents, as we are doing that for other cases (`list_value`,
`user_map_key/value`, `map_value`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]