jingz-db commented on code in PR #47878:
URL: https://github.com/apache/spark/pull/47878#discussion_r1762064480
##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -501,7 +502,47 @@ def transformWithStateUDF(
)
statefulProcessorApiClient.set_implicit_key(key)
- result = statefulProcessor.handleInputRows(key, inputRows)
+
+ batch_timestamp = statefulProcessorApiClient.get_batch_timestamp()
+ watermark_timestamp =
statefulProcessorApiClient.get_watermark_timestamp()
Review Comment:
We will need to have some values to initialize the `TimerValues` in the
`handleInputRows`. On Scala side, we will always pass the real timestamp into
`TimerValues` even timer is not defined:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala#L250
It is unlikely that users will call TimerValues if they do not have timer
registered, but my original intention is to align the behavior with the Scala
side. I guess here we might need to decide between saving a call and aligning
with Scala side. I don't have a strong opinion on which is better. Which
approach do you prefer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]