jingz-db commented on code in PR #47878:
URL: https://github.com/apache/spark/pull/47878#discussion_r1813171890
##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -501,7 +502,41 @@ def transformWithStateUDF(
)
statefulProcessorApiClient.set_implicit_key(key)
- result = statefulProcessor.handleInputRows(key, inputRows)
+
+ if timeMode != "none":
+ batch_timestamp =
statefulProcessorApiClient.get_batch_timestamp()
+ watermark_timestamp =
statefulProcessorApiClient.get_watermark_timestamp()
+ else:
+ batch_timestamp = -1
+ watermark_timestamp = -1
+ # process with invalid expiry timer info and emit data rows
+ data_iter = statefulProcessor.handleInputRows(
+ key, inputRows, TimerValues(batch_timestamp,
watermark_timestamp), ExpiredTimerInfo(False))
+ statefulProcessorApiClient.set_handle_state(
+ StatefulProcessorHandleState.DATA_PROCESSED
+ )
+
+ if timeMode == "processingtime":
+ expiry_list_iter =
statefulProcessorApiClient.get_expiry_timers_iterator(batch_timestamp)
+ elif timeMode == "eventtime":
+ expiry_list_iter =
statefulProcessorApiClient.get_expiry_timers_iterator(watermark_timestamp)
+ else:
+ expiry_list_iter = []
+
+ result_iter_list = [data_iter]
+ # process with valid expiry time info and with empty input rows,
+ # only timer related rows will be emitted
+ for expiry_list in expiry_list_iter:
+ for key_obj, expiry_timestamp in expiry_list:
+ if (timeMode == "processingtime" and expiry_timestamp <
batch_timestamp) or\
Review Comment:
You are correct about this. Thanks for noticing the redundant check. Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]