jingz-db commented on code in PR #48005:
URL: https://github.com/apache/spark/pull/48005#discussion_r1815788604


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -505,22 +514,87 @@ def transformWithStateUDF(
 
             return result
 
-        if isinstance(outputStructType, str):
-            outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
+        def transformWithStateWithInitStateUDF(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            inputRows: Iterator["PandasDataFrameLike"],
+            initialStates: Iterator["PandasDataFrameLike"] = None
+        ) -> Iterator["PandasDataFrameLike"]:
+            """
+            UDF for TWS operator with non-empty initial states. Possible input 
combinations
+            of inputRows and initialStates iterator:
+            - Both `inputRows` and `initialStates` are non-empty: for the 
given key, both input rows
+              and initial states contains the grouping key.
+            - `InitialStates` is non-empty, while `initialStates` is empty. 
For the given key, only
+              initial states contains the key, and it is first batch.
+            - `initialStates` is empty, while `inputRows` is not empty. For 
the given key, only inputRows
+              contains the key, and it is first batch.
+            - `initialStates` is None, while `inputRows` is not empty. This is 
not first batch. `initialStates`
+              is initialized to the positional value as None.
+            """
+            handle = StatefulProcessorHandle(statefulProcessorApiClient)
+
+            if statefulProcessorApiClient.handle_state == 
StatefulProcessorHandleState.CREATED:
+                statefulProcessor.init(handle)
+                statefulProcessorApiClient.set_handle_state(
+                    StatefulProcessorHandleState.INITIALIZED
+                )
+
+            # only process initial state if first batch
+            is_first_batch = statefulProcessorApiClient.is_first_batch()
+            if is_first_batch and initialStates is not None:
+                seen_init_state_on_key = False
+                for cur_initial_state in initialStates:
+                    if seen_init_state_on_key:
+                        raise Exception(f"TransformWithStateWithInitState: 
Cannot have more "

Review Comment:
   I am removing this check as we'll allow multiple value rows for the same 
grouping key as part of the integration of supporting initial state handling 
with state reader source (for flattened list/map state, there will be multiple 
value rows with the same grouping key in the output dataframe).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to