HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1849528014


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -502,53 +502,59 @@ def transformWithStateInPandas(
         if isinstance(outputStructType, str):
             outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
 
-        def handle_data_with_timers(
+        def get_timestamps(
             statefulProcessorApiClient: StatefulProcessorApiClient,
-            key: Any,
-            inputRows: Iterator["PandasDataFrameLike"],
-        ) -> Iterator["PandasDataFrameLike"]:
-            statefulProcessorApiClient.set_implicit_key(key)
+        ) -> Tuple[int, int]:
             if timeMode != "none":
                 batch_timestamp = 
statefulProcessorApiClient.get_batch_timestamp()
                 watermark_timestamp = 
statefulProcessorApiClient.get_watermark_timestamp()
             else:
                 batch_timestamp = -1
                 watermark_timestamp = -1
-            # process with invalid expiry timer info and emit data rows
-            data_iter = statefulProcessor.handleInputRows(
-                key,
-                inputRows,
-                TimerValues(batch_timestamp, watermark_timestamp),
-                ExpiredTimerInfo(False),
-            )
-            
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED)
+            return batch_timestamp, watermark_timestamp
+
+        def handle_data_with_timers(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            batch_timestamp: int,
+            watermark_timestamp: int,
+            inputRows: Optional[Iterator["PandasDataFrameLike"]] = None,
+        ) -> Iterator["PandasDataFrameLike"]:
+            statefulProcessorApiClient.set_implicit_key(key)
+            # process with data rows
+            if inputRows is not None:
+                data_iter = statefulProcessor.handleInputRows(
+                    key, inputRows, TimerValues(batch_timestamp, 
watermark_timestamp)
+                )
+                result_iter_list = [data_iter]
+                statefulProcessorApiClient.set_handle_state(
+                    StatefulProcessorHandleState.DATA_PROCESSED
+                )
+            else:
+                result_iter_list = []
 
-            if timeMode == "processingtime":
+            if timeMode.lower() == "processingtime":
                 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
                     batch_timestamp
                 )
-            elif timeMode == "eventtime":
+            elif timeMode.lower() == "eventtime":
                 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
                     watermark_timestamp
                 )
             else:
                 expiry_list_iter = iter([[]])
 
-            result_iter_list = [data_iter]
-            # process with valid expiry time info and with empty input rows,
-            # only timer related rows will be emitted
+            # process with expiry timers, only timer related rows will be 
emitted

Review Comment:
   I have confused about this every time. Is this relying on the behavior that 
expired timer will be removed so we won't list up the same timer as expired 
multiple times? This is very easy to be forgotten.
   
   If there is any way we can just move this out and do this after we process 
all input? Can this be done in 
transformWithStateUDF/transformWithStateWithInitStateUDF with key = null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to