jingz-db commented on code in PR #49560:
URL: https://github.com/apache/spark/pull/49560#discussion_r1953416549


##########
python/pyspark/sql/streaming/stateful_processor_util.py:
##########
@@ -16,13 +16,220 @@
 #
 
 from enum import Enum
+import itertools
+from typing import Any, Iterator, Optional, TYPE_CHECKING
+from pyspark.sql.streaming.stateful_processor_api_client import (
+    StatefulProcessorApiClient,
+    StatefulProcessorHandleState,
+)
+from pyspark.sql.streaming.stateful_processor import (
+    ExpiredTimerInfo,
+    StatefulProcessor,
+    StatefulProcessorHandle,
+    TimerValues,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
 
 # This file places the utilities for transformWithStateInPandas; we have a 
separate file to avoid
 # putting internal classes to the stateful_processor.py file which contains 
public APIs.
 
 
 class TransformWithStateInPandasFuncMode(Enum):
+    """
+    Internal mode for python worker UDF mode for transformWithStateInPandas; 
external mode are in
+    `StatefulProcessorHandleState` for public use purposes.
+    """
+
     PROCESS_DATA = 1
     PROCESS_TIMER = 2
     COMPLETE = 3
     PRE_INIT = 4
+
+
+class TransformWithStateInPandasUdfUtils:
+    """
+    Internal Utility class used for python worker UDF for 
transformWithStateInPandas. This class is
+    shared for both classic and spark connect mode.
+    """
+
+    def __init__(self, stateful_processor: StatefulProcessor, time_mode: str):
+        self._stateful_processor = stateful_processor
+        self._time_mode = time_mode
+
+    def transformWithStateUDF(

Review Comment:
   Sure, let me add a few comments on to reviewers in the lines!



##########
python/pyspark/sql/streaming/stateful_processor_util.py:
##########
@@ -16,13 +16,220 @@
 #
 
 from enum import Enum
+import itertools
+from typing import Any, Iterator, Optional, TYPE_CHECKING
+from pyspark.sql.streaming.stateful_processor_api_client import (
+    StatefulProcessorApiClient,
+    StatefulProcessorHandleState,
+)
+from pyspark.sql.streaming.stateful_processor import (
+    ExpiredTimerInfo,
+    StatefulProcessor,
+    StatefulProcessorHandle,
+    TimerValues,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
 
 # This file places the utilities for transformWithStateInPandas; we have a 
separate file to avoid
 # putting internal classes to the stateful_processor.py file which contains 
public APIs.
 
 
 class TransformWithStateInPandasFuncMode(Enum):
+    """
+    Internal mode for python worker UDF mode for transformWithStateInPandas; 
external mode are in
+    `StatefulProcessorHandleState` for public use purposes.
+    """
+
     PROCESS_DATA = 1
     PROCESS_TIMER = 2
     COMPLETE = 3
     PRE_INIT = 4
+
+
+class TransformWithStateInPandasUdfUtils:
+    """
+    Internal Utility class used for python worker UDF for 
transformWithStateInPandas. This class is
+    shared for both classic and spark connect mode.
+    """
+
+    def __init__(self, stateful_processor: StatefulProcessor, time_mode: str):
+        self._stateful_processor = stateful_processor
+        self._time_mode = time_mode
+
+    def transformWithStateUDF(

Review Comment:
   Sure, let me add a few comments to reviewers in the lines!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to