jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857466729


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
     def dump_stream(self, iterator, stream):
         """
-        Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-        RecordBatches, and write batches to stream.
+        Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   In my prior implementation, correctness issue happens if there are multiple 
keys expired on a single partition. E.g.  test case 
`test_transform_with_state_init_state_with_timers` will fail if we set the 
partition to "0".
   Previously we call `get_expiry_timers_iterator()` and `handleExpiredTimer()` 
in the `group_ops.py` inside the UDF which is called per key. So when we 
register timer for key "0" inside `handleInitialState()` and then we will enter 
`get_expiry_timers_iterator()`. Because at that time UDF of key "3" is not 
called yet, timer for key "3" is not registered. We will only see key "0" 
expires and will only get `Row(id="0-expired")` in the output of first batch. 
When we enter the UDF for key "3", as in 
`TransformWithStateInPandasStateServer` 
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala#L217)
 we enforce expiryTimestampIter will only be consumed once per partition, JVM 
will return none for key "3" as this iterator is already consumed for key "0". 
This way we have a correctness issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to