jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857426167
##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -502,53 +502,59 @@ def transformWithStateInPandas(
if isinstance(outputStructType, str):
outputStructType = cast(StructType,
_parse_datatype_string(outputStructType))
- def handle_data_with_timers(
+ def get_timestamps(
statefulProcessorApiClient: StatefulProcessorApiClient,
- key: Any,
- inputRows: Iterator["PandasDataFrameLike"],
- ) -> Iterator["PandasDataFrameLike"]:
- statefulProcessorApiClient.set_implicit_key(key)
+ ) -> Tuple[int, int]:
if timeMode != "none":
batch_timestamp =
statefulProcessorApiClient.get_batch_timestamp()
watermark_timestamp =
statefulProcessorApiClient.get_watermark_timestamp()
else:
batch_timestamp = -1
watermark_timestamp = -1
- # process with invalid expiry timer info and emit data rows
- data_iter = statefulProcessor.handleInputRows(
- key,
- inputRows,
- TimerValues(batch_timestamp, watermark_timestamp),
- ExpiredTimerInfo(False),
- )
-
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED)
+ return batch_timestamp, watermark_timestamp
+
+ def handle_data_with_timers(
+ statefulProcessorApiClient: StatefulProcessorApiClient,
+ key: Any,
+ batch_timestamp: int,
+ watermark_timestamp: int,
+ inputRows: Optional[Iterator["PandasDataFrameLike"]] = None,
+ ) -> Iterator["PandasDataFrameLike"]:
+ statefulProcessorApiClient.set_implicit_key(key)
+ # process with data rows
+ if inputRows is not None:
+ data_iter = statefulProcessor.handleInputRows(
+ key, inputRows, TimerValues(batch_timestamp,
watermark_timestamp)
+ )
+ result_iter_list = [data_iter]
+ statefulProcessorApiClient.set_handle_state(
+ StatefulProcessorHandleState.DATA_PROCESSED
+ )
+ else:
+ result_iter_list = []
- if timeMode == "processingtime":
+ if timeMode.lower() == "processingtime":
expiry_list_iter =
statefulProcessorApiClient.get_expiry_timers_iterator(
batch_timestamp
)
- elif timeMode == "eventtime":
+ elif timeMode.lower() == "eventtime":
expiry_list_iter =
statefulProcessorApiClient.get_expiry_timers_iterator(
watermark_timestamp
)
else:
expiry_list_iter = iter([[]])
- result_iter_list = [data_iter]
- # process with valid expiry time info and with empty input rows,
- # only timer related rows will be emitted
+ # process with expiry timers, only timer related rows will be
emitted
Review Comment:
Thanks so much for catching this! I made a terrible correctness bug in my
prior timer implementation. I now moved all timer handling codes into
`serializer.py` where the expired timers are processed per partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]