bogao007 commented on code in PR #48005:
URL: https://github.com/apache/spark/pull/48005#discussion_r1809236355


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -505,22 +514,87 @@ def transformWithStateUDF(
 
             return result
 
-        if isinstance(outputStructType, str):
-            outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
+        def transformWithStateWithInitStateUDF(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            inputRows: Iterator["PandasDataFrameLike"],
+            initialStates: Iterator["PandasDataFrameLike"] = None
+        ) -> Iterator["PandasDataFrameLike"]:
+            """
+            UDF for TWS operator with non-empty initial states. Possible input 
combinations
+            of inputRows and initialStates iterator:
+            - Both `inputRows` and `initialStates` are non-empty: for the 
given key, both input rows
+              and initial states contains the grouping key.
+            - `InitialStates` is non-empty, while `initialStates` is empty. 
For the given key, only
+              initial states contains the key, and it is first batch.
+            - `initialStates` is empty, while `inputRows` is not empty. For 
the given key, only inputRows
+              contains the key, and it is first batch.
+            - `initialStates` is None, while `inputRows` is not empty. This is 
not first batch. `initialStates`
+              is initialized to the positional value as None.
+            """
+            handle = StatefulProcessorHandle(statefulProcessorApiClient)
+
+            if statefulProcessorApiClient.handle_state == 
StatefulProcessorHandleState.CREATED:
+                statefulProcessor.init(handle)
+                statefulProcessorApiClient.set_handle_state(
+                    StatefulProcessorHandleState.INITIALIZED
+                )
+
+            # only process initial state if first batch
+            is_first_batch = statefulProcessorApiClient.is_first_batch()
+            if is_first_batch and initialStates is not None:
+                seen_init_state_on_key = False
+                for cur_initial_state in initialStates:
+                    if seen_init_state_on_key:
+                        raise Exception(f"TransformWithStateWithInitState: 
Cannot have more "

Review Comment:
   Nit: let's include the TODO for classifying the errors here.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -505,22 +514,87 @@ def transformWithStateUDF(
 
             return result
 
-        if isinstance(outputStructType, str):
-            outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
+        def transformWithStateWithInitStateUDF(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            inputRows: Iterator["PandasDataFrameLike"],
+            initialStates: Iterator["PandasDataFrameLike"] = None
+        ) -> Iterator["PandasDataFrameLike"]:
+            """
+            UDF for TWS operator with non-empty initial states. Possible input 
combinations
+            of inputRows and initialStates iterator:
+            - Both `inputRows` and `initialStates` are non-empty: for the 
given key, both input rows
+              and initial states contains the grouping key.
+            - `InitialStates` is non-empty, while `initialStates` is empty. 
For the given key, only
+              initial states contains the key, and it is first batch.
+            - `initialStates` is empty, while `inputRows` is not empty. For 
the given key, only inputRows
+              contains the key, and it is first batch.

Review Comment:
   Ditto.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -505,22 +514,87 @@ def transformWithStateUDF(
 
             return result
 
-        if isinstance(outputStructType, str):
-            outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
+        def transformWithStateWithInitStateUDF(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            inputRows: Iterator["PandasDataFrameLike"],
+            initialStates: Iterator["PandasDataFrameLike"] = None
+        ) -> Iterator["PandasDataFrameLike"]:
+            """
+            UDF for TWS operator with non-empty initial states. Possible input 
combinations
+            of inputRows and initialStates iterator:
+            - Both `inputRows` and `initialStates` are non-empty: for the 
given key, both input rows
+              and initial states contains the grouping key.
+            - `InitialStates` is non-empty, while `initialStates` is empty. 
For the given key, only
+              initial states contains the key, and it is first batch.

Review Comment:
   Ditto.



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -505,22 +514,87 @@ def transformWithStateUDF(
 
             return result
 
-        if isinstance(outputStructType, str):
-            outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
+        def transformWithStateWithInitStateUDF(
+            statefulProcessorApiClient: StatefulProcessorApiClient,
+            key: Any,
+            inputRows: Iterator["PandasDataFrameLike"],
+            initialStates: Iterator["PandasDataFrameLike"] = None
+        ) -> Iterator["PandasDataFrameLike"]:
+            """
+            UDF for TWS operator with non-empty initial states. Possible input 
combinations
+            of inputRows and initialStates iterator:
+            - Both `inputRows` and `initialStates` are non-empty: for the 
given key, both input rows
+              and initial states contains the grouping key.

Review Comment:
   Nit: Let's make it clearer in the comments here.
   ```suggestion
               - Both `inputRows` and `initialStates` are non-empty: for the 
given grouping key, both input rows
                 and initial states contains the data.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to