jingz-db commented on code in PR #48005:
URL: https://github.com/apache/spark/pull/48005#discussion_r1828227151


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -551,25 +550,103 @@ def transformWithStateUDF(
             # TODO(SPARK-49603) set the handle state in the lazily initialized 
iterator
 
             result = itertools.chain(*result_iter_list)
+            return result
+
+        def transformWithStateUDF(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            inputRows: Iterator["PandasDataFrameLike"],
+        ) -> Iterator["PandasDataFrameLike"]:
+            handle = StatefulProcessorHandle(statefulProcessorApiClient)
+
+            if statefulProcessorApiClient.handle_state == 
StatefulProcessorHandleState.CREATED:
+                statefulProcessor.init(handle)
+                statefulProcessorApiClient.set_handle_state(
+                    StatefulProcessorHandleState.INITIALIZED
+                )
+
+            result = handle_data_with_timers(statefulProcessorApiClient, key, 
inputRows)
+            return result
+
+        def transformWithStateWithInitStateUDF(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            inputRows: Iterator["PandasDataFrameLike"],
+            initialStates: Iterator["PandasDataFrameLike"] = None,
+        ) -> Iterator["PandasDataFrameLike"]:
+            """
+            UDF for TWS operator with non-empty initial states. Possible input 
combinations
+            of inputRows and initialStates iterator:
+            - Both `inputRows` and `initialStates` are non-empty: for the 
given key, both input rows
+              and initial states contains the grouping key, both input rows 
and initial states contains data.
+            - `InitialStates` is non-empty, while `initialStates` is empty. 
For the given key, only
+              initial states contains the grouping key and data, and it is 
first batch.
+            - `initialStates` is empty, while `inputRows` is not empty. For 
the given grouping key, only inputRows
+              contains the grouping key and data, and it is first batch.
+            - `initialStates` is None, while `inputRows` is not empty. This is 
not first batch. `initialStates`

Review Comment:
   Yes, empty Dataset is different from None. When we are in non-first batch, 
`initialStates` will be None.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to