jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859577741
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
def dump_stream(self, iterator, stream):
"""
- Read through an iterator of (iterator of pandas DataFrame), serialize
them to Arrow
- RecordBatches, and write batches to stream.
+ Read through chained return results from a single partition of
handleInputRows.
+ For a single partition, after finish handling all input rows, we need
to iterate
+ through all expired timers and handle them. We chain the results of
handleInputRows
+ with handleExpiredTimer into a single iterator and dump the stream as
arrow batches.
"""
- result = [(b, t) for x in iterator for y, t in x for b in y]
- super().dump_stream(result, stream)
+
+ from itertools import tee, chain
+ from pyspark.sql.streaming.stateful_processor_api_client import (
+ StatefulProcessorHandleState,
+ )
+ from pyspark.sql.streaming.stateful_processor import (
+ ExpiredTimerInfo,
+ TimerValues,
+ )
+
+ # Clone the original iterator to get additional args
+ cloned_iterator, result_iterator = tee(iterator)
+ result = [(pd, t) for x in cloned_iterator for y, t in x for pd in
y[0]]
+ args = [(y[1], y[2], t, y[3]) for x in result_iterator for y, t in x]
+
+ # if num of keys is smaller than num of partitions, some partitions
will have empty
+ # input rows; we do nothing for such partitions
+ if len(args) == 0:
+ return
+
+ # all keys on the same partition share the same args
+ statefulProcessorApiClient = args[0][0]
+ statefulProcessor = args[0][1]
+ outputType = args[0][2]
+ timeMode = args[0][3]
+
+ batch_timestamp, watermark_timestamp =
statefulProcessorApiClient.get_timestamps()
+
+ result_iter_list = []
+ if timeMode.lower() == "processingtime":
+ expiry_list_iter =
statefulProcessorApiClient.get_expiry_timers_iterator(
+ batch_timestamp
+ )
+ elif timeMode.lower() == "eventtime":
+ expiry_list_iter =
statefulProcessorApiClient.get_expiry_timers_iterator(
+ watermark_timestamp
+ )
+ else:
+ expiry_list_iter = iter([[]])
+
+ def timer_iter_wrapper(func, *args, **kwargs):
Review Comment:
Moved just below `statefulProcessorApiClient` is initialized. We will need
to access this object from `timer_iter_wrapper`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]