jingz-db commented on code in PR #47933:
URL: https://github.com/apache/spark/pull/47933#discussion_r1746093738


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##########
@@ -62,6 +74,22 @@ class TransformWithStateInPandasStateServer(
     new mutable.HashMap[String, (ValueState[Row], StructType,
       ExpressionEncoder.Deserializer[Row])]()
   }
+  // A map to store the list state name -> (list state, schema, list state row 
deserializer,
+  // list state row serializer) mapping.
+  private val listStates = if (listStatesMapForTest != null) {
+    listStatesMapForTest
+  } else {
+    new mutable.HashMap[String, (ListState[Row], StructType,
+      ExpressionEncoder.Deserializer[Row], 
ExpressionEncoder.Serializer[Row])]()
+  }
+  // A map to store the list state name -> iterator mapping. This is to keep 
track of the
+  // current iterator position for each list state in a grouping key in case 
user tries to fetch
+  // another list state before the current iterator is exhausted.
+  private var listStateIterators = if (listStateIteratorMapForTest != null) {
+    listStateIteratorMapForTest
+  } else {
+    new mutable.HashMap[String, Iterator[Row]]()

Review Comment:
   I am also thinking out loud here by trying to reason about the life cycle of 
maintaining this map if we restart. An instance of 
`TransformWithStateInPandasStateServer` will be alive during the lifecycle of 
one call of UDF/batch, and we'll be accessing the iterator inside one call of 
`handleInputRows`. So we should be fine for the below two scenarios:
   - Two different calls of `listState.get()` on the same grouping key, and 
they will be in two different batches. We expect the iterator to start from the 
beginning. As for different batches, the server instances are different, so 
we'll get two iterators starting from the beginning and this is expected.
   - Query restart. Similar as above, if it is different batch, the iterator 
will always start from the beginning and this is also expected.
   
   So this implementation of maintaining a local map of iterator should be 
acting as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to