bogao007 commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857583249


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
     def dump_stream(self, iterator, stream):
         """
-        Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-        RecordBatches, and write batches to stream.
+        Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   Maybe better to include what the structure looks like for input `iterator` 
given we have added a bunch of new objects as the UDF output. Either add it 
here or down below where `args` are being defined.



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
     def dump_stream(self, iterator, stream):
         """
-        Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-        RecordBatches, and write batches to stream.
+        Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   Thanks @jingz-db for the detailed explaination! Do you think if we should 
add a test case where multiple keys are expired in the same partition? Like we 
either set partition num to 1 or increase the input to have more keys



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
     def dump_stream(self, iterator, stream):
         """
-        Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-        RecordBatches, and write batches to stream.
+        Read through chained return results from a single partition of 
handleInputRows.
+        For a single partition, after finish handling all input rows, we need 
to iterate
+        through all expired timers and handle them. We chain the results of 
handleInputRows
+        with handleExpiredTimer into a single iterator and dump the stream as 
arrow batches.
         """
-        result = [(b, t) for x in iterator for y, t in x for b in y]
-        super().dump_stream(result, stream)
+
+        from itertools import tee, chain
+        from pyspark.sql.streaming.stateful_processor_api_client import (
+            StatefulProcessorHandleState,
+        )
+        from pyspark.sql.streaming.stateful_processor import (
+            ExpiredTimerInfo,
+            TimerValues,
+        )
+
+        # Clone the original iterator to get additional args
+        cloned_iterator, result_iterator = tee(iterator)
+        result = [(pd, t) for x in cloned_iterator for y, t in x for pd in 
y[0]]
+        args = [(y[1], y[2], t, y[3]) for x in result_iterator for y, t in x]
+
+        # if num of keys is smaller than num of partitions, some partitions 
will have empty
+        # input rows; we do nothing for such partitions
+        if len(args) == 0:
+            return
+
+        # all keys on the same partition share the same args
+        statefulProcessorApiClient = args[0][0]
+        statefulProcessor = args[0][1]
+        outputType = args[0][2]
+        timeMode = args[0][3]
+
+        batch_timestamp, watermark_timestamp = 
statefulProcessorApiClient.get_timestamps()
+
+        result_iter_list = []
+        if timeMode.lower() == "processingtime":
+            expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
+                batch_timestamp
+            )
+        elif timeMode.lower() == "eventtime":
+            expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
+                watermark_timestamp
+            )
+        else:
+            expiry_list_iter = iter([[]])
+
+        def timer_iter_wrapper(func, *args, **kwargs):

Review Comment:
   Nit: can we move this method definition to the top of `dump_stream` to 
follow the same pattern in this file? This would also make the code easier to 
read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to